Coverage for pyrc \ core \ network.py: 71%

397 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8from __future__ import annotations 

9import os 

10import pickle 

11from abc import abstractmethod 

12from copy import copy 

13from typing import Any, TYPE_CHECKING, Callable 

14 

15import numpy as np 

16from sympy import lambdify, latex, SparseMatrix, Matrix, diag, Symbol, Basic, ImmutableSparseMatrix, MutableSparseMatrix 

17import sympy as sym 

18import hashlib 

19import networkx as nx 

20from scipy.sparse import coo_matrix 

21 

22from pyrc.paths import run_folder 

23from pyrc.core.components.resistor import Resistor 

24from pyrc.core.inputs import InternalHeatSource 

25from pyrc.core.settings import initial_settings, Settings 

26from pyrc.core.solver.symbolic import SparseSymbolicEvaluator 

27from pyrc.core.solver.handler import InhomogeneousSystemHandler, SolveIVPHandler, HomogeneousSystemHandler 

28from pyrc.tools.errors import HighCourantNumberError 

29from pyrc.tools.functions import add_leading_underscore 

30 

31from pyrc.tools.science import build_jacobian 

32from pyrc.core.components.templates import RCObjects, RCSolution, EquationItem, initial_rc_objects, Cell 

33from pyrc.core.components.templates import solution_object 

34from pyrc.core.components.capacitor import Capacitor 

35 

36if TYPE_CHECKING: 

37 from pyrc.core.nodes import MassFlowNode 

38 from pyrc.core.components.templates import EquationItemInput 

39 from scipy.sparse import spmatrix, sparray 

40 

41 

42class RCNetwork: 

43 def __init__( 

44 self, 

45 save_to_pickle: bool = True, 

46 load_from_pickle: bool = True, 

47 load_solution: bool = True, 

48 num_cores_jacobian: int = 1, # TODO: Parallelization only works on linux currently (because it forks instead 

49 # of spawning it). Problem is: Sympy symbols are not picklable. 

50 settings: Settings = initial_settings, 

51 rc_objects: RCObjects = initial_rc_objects, 

52 rc_solution: RCSolution = solution_object, 

53 # strategy: str = "lambdify", 

54 continue_canceled: bool = True, 

55 ): 

56 """ 

57 

58 Parameters 

59 ---------- 

60 save_to_pickle 

61 load_from_pickle 

62 load_solution 

63 num_cores_jacobian 

64 settings 

65 rc_objects 

66 rc_solution 

67 continue_canceled : bool, optional 

68 If True, continue canceled simulations. 

69 Overwrites load_solution 

70 """ 

71 self.rc_objects: RCObjects = rc_objects 

72 self.settings = settings 

73 self._system_matrix_symbol: MutableSparseMatrix = None 

74 self._input_matrix_symbol: MutableSparseMatrix = None 

75 self._temperature_vector_symbol = None 

76 self._input_vector_symbol = None 

77 self._input_matrix = None 

78 self._system_matrix = None 

79 self.__hash = None 

80 self.rc_solution: RCSolution = rc_solution 

81 # link rc objects to rc solution 

82 self.rc_solution.rc_objects = rc_objects 

83 # self.strategy = strategy 

84 

85 # For each time dependent symbol the following values are not allowed because they would make an entry in the 

86 # system matrix be infinity 

87 self.forbidden_values: dict[Symbol, set] = {} 

88 

89 self.save_to_pickle: bool = save_to_pickle 

90 self.load_from_pickle: bool = load_from_pickle 

91 self.load_solution: bool = load_solution 

92 self.continue_canceled: bool = continue_canceled 

93 

94 if num_cores_jacobian is None: 

95 num_cores_jacobian = os.cpu_count() 

96 self.num_cores_jacobian = min(max(num_cores_jacobian, 1), os.cpu_count()) 

97 

98 self.groups: dict[str:Capacitor] = {} 

99 

100 def __getattr__(self, item): 

101 if getattr(self.rc_objects, item, None): 

102 return self.rc_objects.__getattribute__(item) 

103 return AttributeError 

104 

105 def create_network(self, *args, **kwargs): 

106 EquationItem.reset_counter() 

107 Resistor.reset_counter() 

108 self.rc_objects.wipe_all() 

109 self._create_network(*args, **kwargs) 

110 assert self.nodes is not None 

111 

112 @abstractmethod 

113 def _create_network(self, *args, **kwargs): 

114 pass 

115 

116 def copy_dict(self): 

117 return { 

118 "save_to_pickle": self.save_to_pickle, 

119 "load_from_pickle": self.load_from_pickle, 

120 "load_solution": self.load_solution, 

121 "num_cores_jacobian": self.num_cores_jacobian, 

122 "settings": copy(self.settings), 

123 } 

124 

125 def __copy__(self): 

126 return RCNetwork(**self.copy_dict()) 

127 

128 @property 

129 def nodes(self): 

130 return self.rc_objects.nodes 

131 

132 @property 

133 def hash(self): 

134 """ 

135 Compute a deterministic hash for the `RCNetwork` where node identity (class name + obj.id) and topology matter. 

136 

137 For Capacitor instances, an optional internal_heat_source (identified by its id) is encoded as a node attribute. 

138 

139 For Cell subclass instances, position and delta arrays are encoded. 

140 

141 Returns 

142 ------- 

143 str : 

144 SHA-256 hex digest of the canonical edge and node representation. 

145 """ 

146 if self.__hash is None: 

147 

148 def node_key(_obj) -> str: 

149 return f"{type(_obj).__name__}:{_obj.id}" 

150 

151 def array_str(arr: np.ndarray) -> str: 

152 return ",".join(f"{v:.17g}" for v in arr.ravel()) 

153 

154 graph = nx.Graph() 

155 for obj in self.rc_objects.all: 

156 attrs = {} 

157 if isinstance(obj, Capacitor): 

158 attrs["heat_source_id"] = ( 

159 obj.internal_heat_source.id if obj.internal_heat_source is not None else None 

160 ) 

161 if isinstance(obj, Cell): 

162 attrs["position"] = array_str(obj.position) 

163 attrs["delta"] = array_str(obj.delta) 

164 graph.add_node(node_key(obj), **attrs) 

165 

166 # loop over all linked/connected network objects 

167 for obj in self.rc_objects.all: 

168 for neighbour in obj.neighbours: 

169 graph.add_edge(node_key(obj), node_key(neighbour)) 

170 

171 node_str = "|".join( 

172 f"{nid}#{data}" 

173 for nid, data in sorted((nid, str(sorted(data.items()))) for nid, data in graph.nodes(data=True)) 

174 ) 

175 edge_str = "|".join(sorted(f"{min(u, v)}-{max(u, v)}" for u, v in graph.edges())) 

176 canonical = f"nodes:{node_str};edges:{edge_str}" 

177 self.__hash = hashlib.sha256(canonical.encode()).hexdigest() 

178 return self.__hash 

179 

180 @property 

181 def time_steps(self) -> list: 

182 """ 

183 Returns a list with all time steps. 

184 

185 Returns 

186 ------- 

187 list : 

188 The time steps. 

189 """ 

190 return self.rc_solution.time_steps 

191 

192 def reset_properties(self): 

193 self._temperature_vector_symbol = None 

194 

195 @property 

196 def inputs(self) -> list[EquationItemInput]: 

197 return self.rc_objects.inputs 

198 

199 def change_static_dynamic(self, calculate_static: bool | str): 

200 if isinstance(calculate_static, str): 

201 match calculate_static.lower(): 

202 case "static" | "statisch" | "s": 

203 calculate_static = True 

204 case _: 

205 calculate_static = False 

206 self.settings.calculate_static = calculate_static 

207 

208 def get_node_by_id(self, _id: str | int): 

209 """ 

210 Returns the node with the given id. 

211 

212 Parameters 

213 ---------- 

214 _id : str | int 

215 The id of the node. 

216 

217 Returns 

218 ------- 

219 TemperatureNode | InternalHeatSource : 

220 """ 

221 if isinstance(_id, str): 

222 _id = int(_id) 

223 return next((x for x in self.rc_objects.all_equation_objects if x.id == _id), None) 

224 

225 def no_infinity(self, sp_matrix: SparseMatrix | spmatrix | sparray) -> bool: 

226 """ 

227 Checks the given sparse matrix for infinity entries and symbolic singularities. 

228 

229 Notes 

230 ----- 

231 Saves a dict, which maps each free sympy symbol to a set of values at which at least one symbolic entry 

232 diverges. Empty if no symbolic entries exist. 

233 

234 Parameters 

235 ---------- 

236 sp_matrix : SparseMatrix | spmatrix | sparray 

237 A sparse matrix/vector to check, may contain numeric values or sympy 

238 expressions. 

239 

240 Returns 

241 ------- 

242 tuple[bool, dict] : 

243 bool : True if no numeric infinity was found. 

244 """ 

245 sp_matrix: SparseMatrix | spmatrix | sparray 

246 if isinstance(sp_matrix, SparseMatrix) or isinstance(sp_matrix, ImmutableSparseMatrix): 

247 entries = sp_matrix.todok().values() 

248 else: 

249 sp_matrix: spmatrix | sparray 

250 entries = sp_matrix.data 

251 

252 for entry in entries: 

253 if isinstance(entry, Basic): 

254 if not entry.free_symbols: 

255 if entry in (sym.oo, -sym.oo, sym.zoo, sym.nan): 

256 return False 

257 else: 

258 _, denominator = sym.fraction(sym.cancel(entry)) 

259 if denominator != sym.Integer(1): 

260 for s in denominator.free_symbols: 

261 solution = sym.solve(denominator, s) 

262 self.forbidden_values.setdefault(s, set()).update(solution) 

263 else: 

264 if np.isinf(entry): 

265 return False 

266 

267 return True 

268 

269 def _get_matrix_symbol(self, key: str) -> SparseMatrix | ImmutableSparseMatrix: 

270 """ 

271 Returns the desired matrix in symbolic notation. 

272 

273 Parameters 

274 ---------- 

275 key : str 

276 Either "symbol" or "input" to determine the matrix type. 

277 

278 Returns 

279 ------- 

280 SparseMatrix | ImmutableSparseMatrix : 

281 The (sympy) symbolic notation of the matrix. 

282 """ 

283 attr = f"_{key}_matrix_symbol" 

284 if getattr(self, attr) is None: 

285 self.make_system_matrices() 

286 return getattr(self, attr) 

287 

288 def _build_matrix_function(self, key: str) -> Callable: 

289 """ 

290 Builds the desired matrix lambda function using the symbols matrix, respectively. 

291 

292 Parameters 

293 ---------- 

294 key : str 

295 Either "symbol" or "input" to determine the matrix type. 

296 

297 Returns 

298 ------- 

299 Callable : 

300 The function of the matrix. 

301 """ 

302 matrix_symbol = self._get_matrix_symbol(key) 

303 all_symbols = self.get_symbols() 

304 if self.get_time_dependent_symbols(): 

305 ordered_symbols = [ 

306 next((s for s in matrix_symbol.free_symbols if s.name == symbol.name), symbol) for symbol in all_symbols 

307 ] 

308 return lambdify(ordered_symbols, matrix_symbol, "sympy") 

309 return lambdify(all_symbols, matrix_symbol, "scipy") 

310 

311 def _build_matrix_direct(self, key: str) -> spmatrix: 

312 """ 

313 Bypasses lambdify by substituting all symbol values via name-based xreplace. 

314 

315 Parameters 

316 ---------- 

317 key : str 

318 Either "symbol" or "input" to determine the matrix type. 

319 

320 Notes 

321 ----- 

322 The replace must be done using the name of the symbols and not the objects itself because they might be loaded 

323 from disk and might not match, but the names do. 

324 

325 Returns 

326 ------- 

327 scipy.spmatrix : 

328 A scipy sparse matrix with all values substituted. 

329 """ 

330 matrix_symbol = self._get_matrix_symbol(key) 

331 name_to_value = {s.name: v for s, v in zip(self.get_symbols(), self.get_values())} 

332 free = matrix_symbol.free_symbols 

333 substitution_map = {s: name_to_value[s.name] for s in free if s.name in name_to_value} 

334 

335 dok = matrix_symbol.todok() 

336 rows, cols, data = [], [], [] 

337 for (i, j), expr in dok.items(): 

338 rows.append(i) 

339 cols.append(j) 

340 data.append(float(expr.xreplace(substitution_map))) 

341 

342 return coo_matrix((data, (rows, cols)), shape=matrix_symbol.shape).tocsr() 

343 

344 # import warnings 

345 # import logging 

346 # import pickle 

347 # from concurrent.futures import ProcessPoolExecutor 

348 # from pathlib import Path 

349 # from typing import Callable 

350 # 

351 # import numba 

352 # import numpy as np 

353 # from scipy.sparse import coo_matrix, csr_matrix 

354 # from sympy import lambdify 

355 # from sympy.printing import ccode 

356 # 

357 # def _xreplace_element(args: tuple) -> object: 

358 # """Apply xreplace to a single expression.""" 

359 # expr, static_map = args 

360 # return expr.xreplace(static_map) 

361 # 

362 # def _build_c_source(reduced_exprs: list, td_ordered: list, func_name: str) -> str: 

363 # """ 

364 # Generates a C source file with a single function iterating over all nonzero 

365 # expressions. 

366 # 

367 # Parameters 

368 # ---------- 

369 # reduced_exprs : list 

370 # Sympy expressions with static symbols already substituted. 

371 # td_ordered : list 

372 # Ordered time-dependent sympy symbols. 

373 # func_name : str 

374 # Name of the generated C function. 

375 # 

376 # Returns 

377 # ------- 

378 # str : 

379 # C source code as a string. 

380 # """ 

381 # args = ", ".join(f"double {s.name}" for s in td_ordered) 

382 # lines = [ 

383 # "#include <stddef.h>", 

384 # f"void {func_name}({args}, double *out, size_t n) {{", 

385 # ] 

386 # for i, expr in enumerate(reduced_exprs): 

387 # lines.append(f" out[{i}] = {ccode(expr)};") 

388 # lines.append("}") 

389 # return "\n".join(lines) 

390 # 

391 # def _compile_c_function( 

392 # source: str, 

393 # func_name: str, 

394 # n_out: int, 

395 # cache_path: Path, 

396 # ) -> Callable: 

397 # """ 

398 # Compiles C source to a shared library and returns a callable via ctypes. 

399 # 

400 # Parameters 

401 # ---------- 

402 # source : str 

403 # C source code string. 

404 # func_name : str 

405 # Name of the C function to load. 

406 # n_out : int 

407 # Number of output values (nnz). 

408 # cache_path : Path 

409 # Path to write the compiled .so / .dll. 

410 # 

411 # Returns 

412 # ------- 

413 # Callable : 

414 # Python callable wrapping the compiled C function. 

415 # """ 

416 # import ctypes 

417 # import subprocess 

418 # import tempfile 

419 # 

420 # src_path = cache_path.with_suffix(".c") 

421 # src_path.write_text(source) 

422 # 

423 # result = subprocess.run( 

424 # ["gcc", "-O3", "-shared", "-fPIC", "-o", str(cache_path), str(src_path)], 

425 # capture_output=True, 

426 # text=True, 

427 # ) 

428 # if result.returncode != 0: 

429 # raise RuntimeError(result.stderr) 

430 # 

431 # lib = ctypes.CDLL(str(cache_path)) 

432 # fn = getattr(lib, func_name) 

433 # fn.restype = None 

434 # 

435 # out_type = ctypes.c_double * n_out 

436 # 

437 # def c_callable(*td_vals: float) -> np.ndarray: 

438 # out = out_type() 

439 # fn(*[ctypes.c_double(v) for v in td_vals], out, ctypes.c_size_t(n_out)) 

440 # return np.frombuffer(out, dtype=np.float64).copy() 

441 # 

442 # return c_callable 

443 

444 # def _prepare_matrix_ingredients( 

445 # self, key: str, parallel: bool 

446 # ) -> tuple[list, list, np.ndarray, np.ndarray, tuple, list]: 

447 # """ 

448 # Extracts and reduces nonzero expressions, substituting all static symbols. 

449 # 

450 # Parameters 

451 # ---------- 

452 # key : str 

453 # Either "symbol" or "input" to determine the matrix type. 

454 # parallel : bool 

455 # If True, parallelizes static substitution via ProcessPoolExecutor. 

456 # 

457 # Returns 

458 # ------- 

459 # tuple : 

460 # td_ordered, reduced_exprs, rows, cols, shape, raw exprs (unreduced) 

461 # """ 

462 # matrix_symbol = self._get_matrix_symbol(key) 

463 # all_symbols = self.get_symbols() 

464 # all_values = self.get_values() 

465 # td_symbols = self.get_time_dependent_symbols() 

466 # td_names = {s.name for s in td_symbols} 

467 # 

468 # static_map = { 

469 # s_mat: val 

470 # for sym, val in zip(all_symbols, all_values) 

471 # if sym.name not in td_names 

472 # for s_mat in matrix_symbol.free_symbols 

473 # if s_mat.name == sym.name 

474 # } 

475 # 

476 # td_ordered = [ 

477 # next(s for s in matrix_symbol.free_symbols if s.name == td.name) 

478 # for td in td_symbols 

479 # if any(s.name == td.name for s in matrix_symbol.free_symbols) 

480 # ] 

481 # 

482 # dok = matrix_symbol.todok() 

483 # shape = matrix_symbol.shape 

484 # keys, exprs = zip(*dok.items()) if dok else ([], []) 

485 # rows = np.array([k[0] for k in keys], dtype=np.int32) 

486 # cols = np.array([k[1] for k in keys], dtype=np.int32) 

487 # 

488 # TODO: do not do it like that, it is worse than the solution that already exists. 

489 # if parallel: 

490 # tasks = [(expr, static_map) for expr in exprs] 

491 # with ProcessPoolExecutor() as executor: 

492 # reduced_exprs = list(executor.map(_xreplace_element, tasks)) 

493 # else: 

494 # reduced_exprs = [expr.xreplace(static_map) for expr in exprs] 

495 # 

496 # return td_ordered, reduced_exprs, rows, cols, shape 

497 # 

498 # TODO: This is worse than the current solution, but the other version (numby/c...) should be implemented to speed 

499 # things up. But attention: Currently, it is just vibe coded... 

500 # def _build_matrix_function_lambdify( 

501 # self, key: str, parallel: bool = False 

502 # ) -> Callable: 

503 # """ 

504 # Builds the matrix function using lambdify with numpy backend. 

505 # 

506 # Parameters 

507 # ---------- 

508 # key : str 

509 # Either "symbol" or "input" to determine the matrix type. 

510 # parallel : bool 

511 # If True, parallelizes static symbol substitution. 

512 # 

513 # Returns 

514 # ------- 

515 # Callable : 

516 # Function accepting td symbol values, returning a csr_matrix. 

517 # """ 

518 # td_ordered, reduced_exprs, rows, cols, shape = ( 

519 # self._prepare_matrix_ingredients(key, parallel) 

520 # ) 

521 # 

522 # if not len(rows): 

523 # empty = csr_matrix(shape, dtype=float) 

524 # 

525 # def assembled(*_) -> csr_matrix: 

526 # return empty 

527 # 

528 # return assembled 

529 # 

530 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

531 # template = coo_matrix( 

532 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

533 # ).tocsr() 

534 # 

535 # def assembled(*td_vals: float) -> csr_matrix: 

536 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

537 # return template 

538 # 

539 # return assembled 

540 # 

541 # def _build_matrix_function_numba(self, key: str, parallel: bool = False) -> Callable: 

542 # """ 

543 # Builds the matrix function using a numba-JIT compiled callable, with 

544 # caching to self.save_folder_path keyed by self.hash. 

545 # 

546 # Parameters 

547 # ---------- 

548 # key : str 

549 # Either "symbol" or "input" to determine the matrix type. 

550 # parallel : bool 

551 # If True, parallelizes static symbol substitution. 

552 # 

553 # Returns 

554 # ------- 

555 # Callable : 

556 # Function accepting td symbol values, returning a csr_matrix. 

557 # """ 

558 # td_ordered, reduced_exprs, rows, cols, shape = ( 

559 # self._prepare_matrix_ingredients(key, parallel) 

560 # ) 

561 # 

562 # if not len(rows): 

563 # empty = csr_matrix(shape, dtype=float) 

564 # 

565 # def assembled(*_) -> csr_matrix: 

566 # return empty 

567 # 

568 # return assembled 

569 # 

570 # cache_path = Path(self.save_folder_path) / f"{self.hash}_{key}_numba.pkl" 

571 # 

572 # if cache_path.exists(): 

573 # with open(cache_path, "rb") as f: 

574 # data_func = pickle.load(f) 

575 # else: 

576 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

577 # data_func_nb = numba.njit(data_func, cache=True) 

578 # with open(cache_path, "wb") as f: 

579 # pickle.dump(data_func_nb, f) 

580 # data_func = data_func_nb 

581 # 

582 # template = coo_matrix( 

583 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

584 # ).tocsr() 

585 # 

586 # def assembled(*td_vals: float) -> csr_matrix: 

587 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

588 # return template 

589 # 

590 # return assembled 

591 # 

592 # def _build_matrix_function_c(self, key: str, parallel: bool = False) -> Callable: 

593 # """ 

594 # Builds the matrix function using a compiled C shared library via ctypes, 

595 # falling back to numba if compilation fails. 

596 # 

597 # Parameters 

598 # ---------- 

599 # key : str 

600 # Either "symbol" or "input" to determine the matrix type. 

601 # parallel : bool 

602 # If True, parallelizes static symbol substitution. 

603 # 

604 # Returns 

605 # ------- 

606 # Callable : 

607 # Function accepting td symbol values, returning a csr_matrix. 

608 # """ 

609 # td_ordered, reduced_exprs, rows, cols, shape = ( 

610 # self._prepare_matrix_ingredients(key, parallel) 

611 # ) 

612 # 

613 # if not len(rows): 

614 # empty = csr_matrix(shape, dtype=float) 

615 # 

616 # def assembled(*_) -> csr_matrix: 

617 # return empty 

618 # 

619 # return assembled 

620 # 

621 # func_name = f"matrix_func_{self.hash}_{key}" 

622 # so_path = Path(self.save_folder_path) / f"{func_name}.so" 

623 # 

624 # try: 

625 # if so_path.exists(): 

626 # import ctypes 

627 # lib = ctypes.CDLL(str(so_path)) 

628 # fn = getattr(lib, func_name) 

629 # n_out = len(rows) 

630 # out_type = ctypes.c_double * n_out 

631 # 

632 # def data_func(*td_vals: float) -> np.ndarray: 

633 # out = out_type() 

634 # fn( 

635 # *[ctypes.c_double(v) for v in td_vals], 

636 # out, 

637 # ctypes.c_size_t(n_out), 

638 # ) 

639 # return np.frombuffer(out, dtype=np.float64).copy() 

640 # else: 

641 # source = _build_c_source(reduced_exprs, td_ordered, func_name) 

642 # data_func = _compile_c_function( 

643 # source, func_name, len(rows), so_path 

644 # ) 

645 # except Exception as e: 

646 # print( 

647 # f"C compilation failed for '{key}' matrix ({e}), falling back to numba." 

648 # ) 

649 # return self._build_matrix_function_numba(key, parallel=parallel) 

650 # 

651 # template = coo_matrix( 

652 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

653 # ).tocsr() 

654 # 

655 # def assembled(*td_vals: float) -> csr_matrix: 

656 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

657 # return template 

658 # 

659 # return assembled 

660 # 

661 # def _build_matrix_function( 

662 # self, key: str, strategy: str | None = None 

663 # ) -> Callable: 

664 # """ 

665 # Dispatcher selecting the matrix function build strategy. 

666 # 

667 # Parameters 

668 # ---------- 

669 # key : str 

670 # Either "symbol" or "input" to determine the matrix type. 

671 # strategy : str | None 

672 # One of "lambdify", "parallel", "numba", "c". If None, falls back 

673 # to self.strategy set at __init__. 

674 # 

675 # Returns 

676 # ------- 

677 # Callable : 

678 # Function accepting td symbol values, returning a csr_matrix. 

679 # """ 

680 # s = strategy if strategy is not None else self.strategy 

681 # if s == "lambdify": 

682 # return self._build_matrix_function_lambdify(key, parallel=False) 

683 # if s == "parallel": 

684 # return self._build_matrix_function_lambdify(key, parallel=True) 

685 # if s == "numba": 

686 # return self._build_matrix_function_numba(key, parallel=False) 

687 # if s == "c": 

688 # return self._build_matrix_function_c(key, parallel=False) 

689 # raise ValueError(f"Unknown strategy '{s}'. Choose from: lambdify, parallel, numba, c.") 

690 # 

691 # def _build_matrix( 

692 # self, key: str, strategy: str | None = None 

693 # ) -> csr_matrix: 

694 # """ 

695 # Inputs all values into the desired matrix function. 

696 # 

697 # Parameters 

698 # ---------- 

699 # key : str 

700 # Either "symbol" or "input" to determine the matrix type. 

701 # strategy : str | None 

702 # Overrides self.strategy if provided. 

703 # 

704 # Returns 

705 # ------- 

706 # csr_matrix : 

707 # The desired matrix. 

708 # """ 

709 # attr = f"_{key}_matrix" 

710 # if getattr(self, attr) is None: 

711 # if not self.get_time_dependent_symbols(): 

712 # result = self._build_matrix_direct(key) 

713 # else: 

714 # result = self._build_matrix_function(key, strategy=strategy)( 

715 # *self.get_values() 

716 # ) 

717 # assert self.no_infinity(result), ( 

718 # f"Infinity detected in {key} matrix. System cannot be solved." 

719 # ) 

720 # setattr(self, attr, result) 

721 # return getattr(self, attr) 

722 

723 def _build_matrix(self, key: str) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

724 """ 

725 Inputs all values (can be symbolic values) into the desired matrix function. 

726 

727 Parameters 

728 ---------- 

729 key : str 

730 Either "symbol" or "input" to determine the matrix type. 

731 

732 Returns 

733 ------- 

734 SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray : 

735 The desired matrix. If no time dependent variables are used, it is a scipy sparse matrix, otherwise it's a 

736 sympy SparseMatrix. 

737 """ 

738 attr = f"_{key}_matrix" 

739 if getattr(self, attr) is None: 

740 if self.get_time_dependent_symbols(): 

741 result = self._build_matrix_function(key)(*self.get_values()) 

742 else: 

743 result = self._build_matrix_direct(key) 

744 assert self.no_infinity(result), f"Infinity detected in {key} matrix. System cannot be solved." 

745 setattr(self, attr, result) 

746 return getattr(self, attr) 

747 

748 @property 

749 def system_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

750 return self._get_matrix_symbol("system") 

751 

752 @property 

753 def input_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

754 return self._get_matrix_symbol("input") 

755 

756 @property 

757 def system_matrix_function(self) -> Callable: 

758 return self._build_matrix_function("system") 

759 

760 @property 

761 def input_matrix_function(self) -> Callable: 

762 return self._build_matrix_function("input") 

763 

764 @property 

765 def system_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

766 return self._build_matrix("system") 

767 

768 @property 

769 def input_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

770 return self._build_matrix("input") 

771 

772 @property 

773 def temperature_vector_symbol(self) -> list: 

774 if self._temperature_vector_symbol is None: 

775 result = [node.temperature_symbol for node in self.nodes] 

776 self._temperature_vector_symbol = result 

777 return self._temperature_vector_symbol 

778 

779 @property 

780 def temperature_vector(self) -> np.ndarray: 

781 result = [node.temperature for node in self.nodes] 

782 return np.array(result).flatten() 

783 

784 @property 

785 def input_vector(self) -> np.ndarray: 

786 result = [n for input_item in self.inputs for n in input_item.values] 

787 return np.array(result).flatten() 

788 

789 @property 

790 def input_vector_symbol(self) -> np.ndarray: 

791 if self._input_vector_symbol is None: 

792 result = [] 

793 if self.inputs is not None and self.inputs != []: 

794 result = [n for input_item in self.inputs for n in input_item.symbols] 

795 self._input_vector_symbol = np.array(result).flatten() 

796 return self._input_vector_symbol 

797 

798 # @property 

799 # def seconds_of_day(self): 

800 # """ 

801 # Returns the seconds that have already passed on the day of ``self.start_time``. 

802 # 

803 # Returns 

804 # ------- 

805 # float | int : 

806 # The passed seconds of the day. 

807 # """ 

808 # return (self.settings.start_date - self.settings.start_date.astype("datetime64[D]")) / np.timedelta64(1, "s") 

809 

810 @property 

811 def save_folder_path(self): 

812 if self.settings.save_folder_path is None: 

813 return run_folder 

814 return self.settings.save_folder_path 

815 

816 @property 

817 def pickle_path(self) -> str: 

818 filename = f"{self.hash}.pickle" 

819 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

820 

821 def pickle_path_result(self, t_span, name_add_on=None): 

822 name_add_on = add_leading_underscore(name_add_on) 

823 filename = f"{self.hash}{name_add_on}_{int(t_span[1])}_result.pickle" 

824 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

825 

826 @property 

827 def pickle_path_single_solution(self) -> str: 

828 filename = f"{self.hash}_single_solution.pickle" 

829 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

830 

831 def pickle_path_solution(self, t_span, name_add_on=None) -> str: 

832 name_add_on = add_leading_underscore(name_add_on) 

833 filename = f"{self.hash}{name_add_on}_{int(t_span[0])}_{int(t_span[1])}_solution.pickle" 

834 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

835 

836 def save_matrices(self): 

837 if self.save_to_pickle: 

838 file_path = self.pickle_path 

839 matrices = [ 

840 self._system_matrix_symbol, 

841 self._input_matrix_symbol, 

842 self._input_vector_symbol, 

843 self._temperature_vector_symbol, 

844 self.forbidden_values, 

845 ] 

846 with open(file_path, "wb") as f: 

847 pickle.dump(matrices, f) 

848 

849 def load_matrices(self): 

850 file_path = self.pickle_path 

851 if os.path.exists(file_path): 

852 with open(file_path, "rb") as f: 

853 matrices = pickle.load(f) 

854 self._system_matrix_symbol = matrices[0] 

855 self._input_matrix_symbol = matrices[1] 

856 self._input_vector_symbol = matrices[2] 

857 self._temperature_vector_symbol = matrices[3] 

858 self.forbidden_values = matrices[4] 

859 print("matrices loaded") 

860 return True 

861 else: 

862 print("No network found nor loaded.") 

863 return False 

864 

865 def save_last_step_solution(self, pickle_path_single_solution: str = None): 

866 """ 

867 Saves the last time step solution. 

868 """ 

869 if pickle_path_single_solution is None: 

870 pickle_path_single_solution = self.pickle_path_single_solution 

871 file_path = pickle_path_single_solution 

872 self.rc_solution.save_last_step(file_path) 

873 

874 def load_initial_values(self, return_bool: bool = False, pickle_path_single_solution: str = None) -> bool: 

875 """ 

876 Loads the last time step solution (temperature vector) and sets it as initial values. 

877 """ 

878 if pickle_path_single_solution is None: 

879 pickle_path_single_solution = self.pickle_path_single_solution 

880 file_path = pickle_path_single_solution 

881 if os.path.exists(file_path): 

882 with open(file_path, "rb") as f: 

883 solution_tuple = pickle.load(f) 

884 temp_vector = solution_tuple[0] 

885 input_vector = solution_tuple[1] 

886 for node, temp in zip(self.rc_objects.nodes, temp_vector): 

887 node.initial_temperature = temp 

888 for item, value in zip(self.inputs, input_vector): 

889 item.initial_value = value 

890 if return_bool: 

891 return True 

892 else: 

893 print("No temperature vector found nor loaded.") 

894 if return_bool: 

895 return False 

896 

897 @property 

898 def inhomogeneous_system(self) -> bool: 

899 """ 

900 Returns True if inputs are existing and the network is inhomogeneous. False otherwise. 

901 

902 Returns 

903 ------- 

904 bool : 

905 """ 

906 if self.rc_objects.inputs is None or self.rc_objects.inputs == []: 

907 return False 

908 return True 

909 

910 def solve_network( 

911 self, 

912 t_span: tuple | Any = None, 

913 print_progress=False, 

914 name_add_on: str = "", 

915 check_courant: bool = True, 

916 time_dependent_function: Callable = None, 

917 hook_function: Callable = None, 

918 expected_solution_size_mb=5000, 

919 **kwargs: dict[str : int | Any], 

920 ): 

921 """ 

922 Solves the network at the given times. 

923 

924 If time_vector is None, one second is calculated. 

925 

926 Parameters 

927 ---------- 

928 t_span : tuple | Any 

929 Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf. 

930 Both t0 and tf must be floats or values interpretable by the float conversion function. 

931 print_progress : bool, optional 

932 If True, some print outs are made during solving to get the time step that is currently simulated. 

933 name_add_on : str, optional 

934 Optional add-on to the name of in-between saves that is placed after the hash value (separated by "_"). 

935 Example save name: 

936 42395d3d9f07f06ce9c9cb609b406aa54fc2dc5e8c4d9cc75987d9a02541ad2f_nameAddOn_zw_000001080_h.pickle 

937 check_courant : bool, optional 

938 If True, self.check_courant(0.4) is run before simulating. 

939 time_dependent_function : Callable 

940 A function returning the values for all time dependent symbols that are in the matrices in the same order as 

941 self.get_time_dependent_symbols(). 

942 This function is required if time dependent symbols exist. 

943 It must return an iterable (e.g. list) 

944 hook_function : Callable, optional 

945 A function that is run before the network is solved using solve_ivp. 

946 Can be used to catch the time before the solving actually starts (e.g. used in `PerformanceTest`\). 

947 expected_solution_size_mb : int | float, optional 

948 The expected solution size in Megabytes. Is used for RAM-Management in ``SystemHandler.solve()``\. 

949 The solution size depends on the size of the RC network and number of saved time steps. 

950 kwargs : dict[str: int | Any], optional 

951 Passed to SystemHandler. 

952 

953 Returns 

954 ------- 

955 scipy.integrate._ivp.ivp.OdeResult : 

956 The solution of the network (time and temperatures of all nodes). 

957 See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.solve_ivp.html#solve-ivp 

958 """ 

959 if self.get_time_dependent_symbols(): 

960 assert time_dependent_function 

961 

962 continued_simulation = False 

963 if self.load_solution or self.continue_canceled: 

964 success = self.rc_solution.load_solution( 

965 self.pickle_path_solution(t_span, name_add_on), last_time_step=t_span[-1] 

966 ) 

967 if not success: 

968 success = self.rc_solution.load_solution( 

969 self.pickle_path_result(t_span, name_add_on), last_time_step=t_span[-1] 

970 ) 

971 if success or (not isinstance(success, bool) and success == 0): # ==0 because 0 could be the last time 

972 # step (will never be the case.... -.-) 

973 if not isinstance(success, bool): 

974 if not self.continue_canceled: 

975 self.rc_solution.delete_solution_except_first() 

976 print(f"Partial solution detected but not used because continue_canceled is False.") 

977 else: 

978 print( 

979 f"Simulation is started at loaded time step {success} on {success / t_span[-1] * 100:.2f} %." 

980 ) 

981 t_span = (success, t_span[-1]) 

982 continued_simulation = True 

983 else: 

984 print("Solution was loaded from file.") 

985 return None 

986 

987 max_step = 1 

988 if check_courant and (self.rc_objects.mass_flow_nodes is not None or self.rc_objects.mass_flow_nodes != []): 

989 loop_counter = 0 

990 while loop_counter < 1000: 

991 try: 

992 self.check_courant(max_step) # approximated maximum time step used during iterations. 

993 # if no error 

994 break 

995 except HighCourantNumberError: 

996 loop_counter += 1 

997 max_step *= 0.99 

998 

999 temperature_vector = np.array(self.temperature_vector).flatten() 

1000 system_matrix = self.system_matrix 

1001 time_symbols = self.get_time_dependent_symbols() 

1002 if time_symbols: 

1003 system_matrix = SparseSymbolicEvaluator(system_matrix, time_symbols) 

1004 

1005 system_handler_kwargs = { 

1006 "system_matrix": system_matrix, 

1007 "rc_solution": self.rc_solution, 

1008 "settings": self.settings, 

1009 "print_progress": print_progress, 

1010 "print_points": np.linspace(t_span[0], t_span[-1], 20+1), 

1011 "batch_end": t_span[-1], 

1012 "time_dependent_function": time_dependent_function, 

1013 } 

1014 

1015 # create the time_steps to solve the results (every X seconds a value) 

1016 t_eval = np.linspace( 

1017 t_span[0], t_span[1], max(1, int((t_span[1] - t_span[0]) / self.settings.save_all_x_seconds) + 1) 

1018 ) 

1019 

1020 if self.inhomogeneous_system: 

1021 input_matrix = self.input_matrix 

1022 if time_symbols: 

1023 input_matrix = SparseSymbolicEvaluator(input_matrix, time_symbols) 

1024 input_vector = self.input_vector 

1025 

1026 # Each Input can be called as a function to calculate new boundary values during simulation: 

1027 functions_list = self.rc_objects.inputs 

1028 # Also each Input has some functions that can be called independently of the object that return values for 

1029 # the calculation. 

1030 kwargs_functions = {} 

1031 for item in self.rc_objects.inputs: 

1032 kwargs_functions.update(item.get_kwargs_functions()) # Values should be overwritten so that the same 

1033 # values are calculated only once 

1034 

1035 system_handler_kwargs.update( 

1036 { 

1037 "input_matrix": input_matrix, 

1038 "input_vector": input_vector, 

1039 "functions_list": functions_list, 

1040 "kwargs_functions": kwargs_functions, 

1041 "t_eval": t_eval, 

1042 "first_time": t_span[0], 

1043 } 

1044 ) 

1045 

1046 # only update the keys that are in the predefined dictionary (it would also work if everything is passed anyway) 

1047 system_handler_kwargs.update({k: kwargs[k] for k in system_handler_kwargs if k in kwargs}) 

1048 

1049 system_handler = self.system_handler_type(**system_handler_kwargs) 

1050 save_prefix = f"{self.hash}" 

1051 if name_add_on: 

1052 save_prefix += f"_{name_add_on.removeprefix('_').removesuffix('_')}" 

1053 solve_handler = SolveIVPHandler( 

1054 system_handler=system_handler, 

1055 save_prefix=save_prefix, 

1056 solve_settings=self.settings.solve_settings, 

1057 ) 

1058 if hook_function is not None: 

1059 hook_function() 

1060 solve_handler.solve( 

1061 t_span=t_span, 

1062 y0=temperature_vector, 

1063 t_eval=t_eval, 

1064 continued_simulation=continued_simulation, 

1065 expected_solution_size_mb=expected_solution_size_mb, 

1066 ) 

1067 

1068 @property 

1069 def system_handler_type(self): 

1070 """ 

1071 Returns the SystemHandler type depending on which system (in/homogeneous) is needed. 

1072 

1073 Returns 

1074 ------- 

1075 type : 

1076 The system handler type. 

1077 """ 

1078 if self.inhomogeneous_system: 

1079 return InhomogeneousSystemHandler 

1080 return HomogeneousSystemHandler 

1081 

1082 @property 

1083 def all_objects(self) -> list: 

1084 return [*(self.rc_objects.resistors or []), *(self.rc_objects.nodes or []), *(self.rc_objects.inputs or [])] 

1085 

1086 @property 

1087 def objects_with_identical_symbols(self) -> list: 

1088 """ 

1089 Get all objects with symbols that are relevant for the system matrix. 

1090 

1091 You cannot just use every object, because some (e.g. Resistors) can be summarized in local groups with one 

1092 symbol representing the whole group, like the equivalent resistance of multiple connected Resistors. 

1093 

1094 No ``InternalHeatSource`` s are returned, because their symbol is already represented in the corresponding Node. 

1095 

1096 Returns 

1097 ------- 

1098 list : 

1099 All network objects without the ones that are already taken into account by the symbols of another. 

1100 The list has the same order every time it is created. 

1101 """ 

1102 

1103 def filter_resistors(resistor_list): 

1104 # Create set of all resistors for O(1) lookups 

1105 all_resistors = {obj for obj in resistor_list if isinstance(obj, Resistor)} 

1106 to_exclude = set() 

1107 

1108 # Collect all resistors to exclude in order of appearance 

1109 for obj in resistor_list: 

1110 if isinstance(obj, Resistor) and obj not in to_exclude: 

1111 inbetween = set(obj.all_resistors_inbetween) - {obj} 

1112 to_exclude.update(inbetween & all_resistors) 

1113 

1114 # Filter maintaining original order 

1115 return [obj for obj in resistor_list if not obj in to_exclude] 

1116 

1117 result = [rc_obj for rc_obj in self.all_objects if not isinstance(rc_obj, InternalHeatSource)] 

1118 return filter_resistors(result) 

1119 

1120 def get_symbols(self): 

1121 """ 

1122 Returns a list with all symbols, except time dependent symbols. 

1123 

1124 Returns 

1125 ------- 

1126 list : 

1127 All symbols, except time dependent symbols. 

1128 """ 

1129 all_symbols = [symb for rc_object in self.objects_with_identical_symbols for symb in rc_object.symbols] 

1130 return all_symbols 

1131 

1132 def get_values(self): 

1133 """ 

1134 Returns a list with all values, except of time dependent symbols. 

1135 

1136 Returns 

1137 ------- 

1138 list : 

1139 All values, except of time dependent symbols. 

1140 """ 

1141 all_values = [ 

1142 val if isinstance(val, Basic) else np.float64(val) 

1143 for rc_object in self.objects_with_identical_symbols 

1144 for val in rc_object.values 

1145 ] 

1146 return all_values 

1147 

1148 def get_time_dependent_symbols(self) -> list: 

1149 """ 

1150 A list with all time dependent symbols but each of it is only added once. 

1151 

1152 Returns 

1153 ------- 

1154 list : 

1155 The time dependent symbols. 

1156 """ 

1157 symbols = [] 

1158 seen = set() 

1159 for rc_object in self.objects_with_identical_symbols: 

1160 for symb in rc_object.time_dependent_symbols: 

1161 if symb not in seen: 

1162 seen.add(symb) 

1163 symbols.append(symb) 

1164 return symbols 

1165 

1166 def get_symbols_and_values(self) -> tuple[list, list]: 

1167 """ 

1168 Return two lists of first: all symbols and second: all associated values of all rc objects. 

1169 

1170 Returns 

1171 ------- 

1172 tuple[list, list] : 

1173 

1174 """ 

1175 return self.get_symbols(), self.get_values() 

1176 

1177 def make_system_matrices(self): 

1178 """ 

1179 Creates the Jacobian matrices of the RC Network. 

1180 

1181 If self.load_from_pickle the whole network will be loaded from pickle file if it exists to prevent heavy 

1182 calculations. 

1183 

1184 Returns 

1185 ------- 

1186 sympy expression : 

1187 The system matrix as sympy expression. 

1188 """ 

1189 success = False 

1190 if self.load_from_pickle: 

1191 try: 

1192 success = self.load_matrices() 

1193 except Exception: 

1194 pass 

1195 if not success: 

1196 # fill system matrix 

1197 terms = [] 

1198 involved_symbols: list[set] = [] 

1199 temperature_symbols: list[set] = [] 

1200 variables = [] 

1201 for node in self.nodes: 

1202 # works only for one term 

1203 term, t_symbols, all_symbols = node.temperature_derivative_term() 

1204 terms.append(term) 

1205 involved_symbols.append(all_symbols) 

1206 temperature_symbols.append(t_symbols) 

1207 variables.append(node.temperature_symbol) 

1208 

1209 self._system_matrix_symbol = build_jacobian( 

1210 terms, variables, temperature_symbols, num_cores=self.num_cores_jacobian 

1211 ) 

1212 print("system matrix created") 

1213 

1214 self.save_matrices() 

1215 

1216 input_variables = list(self.input_vector_symbol) 

1217 self._input_matrix_symbol = build_jacobian( 

1218 terms, input_variables, involved_symbols, num_cores=self.num_cores_jacobian 

1219 ) 

1220 print("input matrix created") 

1221 

1222 self.save_matrices() 

1223 

1224 def check_courant(self, time_step: float): 

1225 """ 

1226 Calculates the Courant number for the whole network and raises an error if its larger than 1. 

1227 

1228 Parameters 

1229 ---------- 

1230 time_step : float 

1231 The maximum time step in seconds. 

1232 

1233 Raises 

1234 ------ 

1235 HighCourantNumberError : 

1236 If the courant number is greater than 1 the network will calculate shit. 

1237 

1238 """ 

1239 # TODO: Do not use the courant number but return a critical maximum time_step to easy raise an error during 

1240 # iterations. So: no iteration, but calculate the maximum time_step and return this so that it can be set. 

1241 mass_flow_nodes = self.rc_objects.mass_flow_nodes 

1242 mass_flow_node: MassFlowNode 

1243 for mass_flow_node in mass_flow_nodes: 

1244 if mass_flow_node.courant_number(time_step) > 1: 

1245 raise HighCourantNumberError 

1246 

1247 def matrix_to_latex_diag(self): 

1248 matrix = self.system_matrix_symbol 

1249 diag_elements = matrix.diagonal() 

1250 matrix_no_diag = matrix - diag(*diag_elements) 

1251 

1252 matrix_latex = latex(matrix_no_diag) 

1253 diag_latex = latex(Matrix(diag_elements).reshape(len(diag_elements), 1)) 

1254 

1255 return f"{matrix_latex} + \\mathrm{{diag}}\\left({diag_latex}\\right)"