Coverage for pyrc \ core \ network.py: 70%

416 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:20 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8from __future__ import annotations 

9import os 

10import pickle 

11from abc import abstractmethod 

12from copy import copy 

13from typing import Any, TYPE_CHECKING, Callable 

14 

15import numpy as np 

16from sympy import lambdify, latex, SparseMatrix, Matrix, diag, Symbol, Basic, ImmutableSparseMatrix, MutableSparseMatrix 

17import sympy as sym 

18import hashlib 

19import networkx as nx 

20from scipy.sparse import coo_matrix 

21 

22from pyrc.core.solver.stationary import solve_stationary 

23from pyrc.paths import run_folder 

24from pyrc.core.components.resistor import Resistor 

25from pyrc.core.inputs import InternalHeatSource 

26from pyrc.core.settings import initial_settings, Settings 

27from pyrc.core.solver.symbolic import SparseSymbolicEvaluator 

28from pyrc.core.solver.handler import InhomogeneousSystemHandler, SolveIVPHandler, HomogeneousSystemHandler 

29from pyrc.tools.errors import HighCourantNumberError 

30from pyrc.tools.functions import add_leading_underscore 

31 

32from pyrc.tools.science import build_jacobian 

33from pyrc.core.components.templates import RCObjects, RCSolution, EquationItem, initial_rc_objects, Cell 

34from pyrc.core.components.templates import solution_object 

35from pyrc.core.components.capacitor import Capacitor 

36 

37if TYPE_CHECKING: 

38 from pyrc.core.nodes import MassFlowNode 

39 from pyrc.core.components.templates import EquationItemInput 

40 from scipy.sparse import spmatrix, sparray 

41 

42 

43class RCNetwork: 

44 def __init__( 

45 self, 

46 save_to_pickle: bool = True, 

47 load_from_pickle: bool = True, 

48 load_solution: bool = True, 

49 num_cores_jacobian: int = 1, # TODO: Parallelization only works on linux currently (because it forks instead 

50 # of spawning it). Problem is: Sympy symbols are not picklable. 

51 settings: Settings = initial_settings, 

52 rc_objects: RCObjects = initial_rc_objects, 

53 rc_solution: RCSolution = solution_object, 

54 # strategy: str = "lambdify", 

55 continue_canceled: bool = True, 

56 ): 

57 """ 

58 

59 Parameters 

60 ---------- 

61 save_to_pickle 

62 load_from_pickle 

63 load_solution 

64 num_cores_jacobian 

65 settings 

66 rc_objects 

67 rc_solution 

68 continue_canceled : bool, optional 

69 If True, continue canceled simulations. 

70 Overwrites load_solution 

71 """ 

72 self.rc_objects: RCObjects = rc_objects 

73 self.settings = settings 

74 self._system_matrix_symbol: MutableSparseMatrix = None 

75 self._input_matrix_symbol: MutableSparseMatrix = None 

76 self._temperature_vector_symbol = None 

77 self._input_vector_symbol = None 

78 self._input_matrix = None 

79 self._system_matrix = None 

80 self.__hash = None 

81 self.rc_solution: RCSolution = rc_solution 

82 # link rc objects to rc solution 

83 self.rc_solution.rc_objects = rc_objects 

84 # self.strategy = strategy 

85 

86 # For each time dependent symbol the following values are not allowed because they would make an entry in the 

87 # system matrix be infinity 

88 self.forbidden_values: dict[Symbol, set] = {} 

89 

90 self.save_to_pickle: bool = save_to_pickle 

91 self.load_from_pickle: bool = load_from_pickle 

92 self.load_solution: bool = load_solution 

93 self.continue_canceled: bool = continue_canceled 

94 

95 if num_cores_jacobian is None: 

96 num_cores_jacobian = os.cpu_count() 

97 self.num_cores_jacobian = min(max(num_cores_jacobian, 1), os.cpu_count()) 

98 

99 self.groups: dict[str, Capacitor] = {} 

100 

101 def __getattr__(self, item): 

102 if getattr(self.rc_objects, item, None): 

103 return self.rc_objects.__getattribute__(item) 

104 return AttributeError 

105 

106 def create_network(self, *args, **kwargs): 

107 EquationItem.reset_counter() 

108 Resistor.reset_counter() 

109 self.rc_objects.wipe_all() 

110 self._create_network(*args, **kwargs) 

111 assert self.nodes is not None 

112 

113 @abstractmethod 

114 def _create_network(self, *args, **kwargs): 

115 pass 

116 

117 def copy_dict(self): 

118 return { 

119 "save_to_pickle": self.save_to_pickle, 

120 "load_from_pickle": self.load_from_pickle, 

121 "load_solution": self.load_solution, 

122 "num_cores_jacobian": self.num_cores_jacobian, 

123 "settings": copy(self.settings), 

124 } 

125 

126 def __copy__(self): 

127 return RCNetwork(**self.copy_dict()) 

128 

129 @property 

130 def nodes(self): 

131 return self.rc_objects.nodes 

132 

133 @property 

134 def hash(self): 

135 """ 

136 Compute a deterministic hash for the :class:`RCNetwork` where node identity (class name + obj.id) and topology matter. 

137 

138 For :class:`Capacitor` instances, an optional internal_heat_source (identified by its id) is encoded as a node attribute. 

139 

140 For :class:`Cell` subclass instances, position and delta arrays are encoded. 

141 

142 Returns 

143 ------- 

144 str : 

145 SHA-256 hex digest of the canonical edge and node representation. 

146 """ 

147 if self.__hash is None: 

148 

149 def node_key(_obj) -> str: 

150 return f"{type(_obj).__name__}:{_obj.id}" 

151 

152 def array_str(arr: np.ndarray) -> str: 

153 return ",".join(f"{v:.17g}" for v in arr.ravel()) 

154 

155 graph = nx.Graph() 

156 for obj in self.rc_objects.all: 

157 attrs = {} 

158 if isinstance(obj, Capacitor): 

159 attrs["heat_source_id"] = ( 

160 obj.internal_heat_source.id if obj.internal_heat_source is not None else None 

161 ) 

162 if isinstance(obj, Cell): 

163 attrs["position"] = array_str(obj.position) 

164 attrs["delta"] = array_str(obj.delta) 

165 graph.add_node(node_key(obj), **attrs) 

166 

167 # loop over all linked/connected network objects 

168 for obj in self.rc_objects.all: 

169 for neighbour in obj.neighbours: 

170 graph.add_edge(node_key(obj), node_key(neighbour)) 

171 

172 node_str = "|".join( 

173 f"{nid}#{data}" 

174 for nid, data in sorted((nid, str(sorted(data.items()))) for nid, data in graph.nodes(data=True)) 

175 ) 

176 edge_str = "|".join(sorted(f"{min(u, v)}-{max(u, v)}" for u, v in graph.edges())) 

177 canonical = f"nodes:{node_str};edges:{edge_str}" 

178 self.__hash = hashlib.sha256(canonical.encode()).hexdigest() 

179 return self.__hash 

180 

181 @property 

182 def time_steps(self) -> list: 

183 """ 

184 Returns a list with all time steps. 

185 

186 Returns 

187 ------- 

188 list : 

189 The time steps. 

190 """ 

191 return self.rc_solution.time_steps 

192 

193 def reset_properties(self): 

194 self._temperature_vector_symbol = None 

195 

196 @property 

197 def inputs(self) -> list[EquationItemInput]: 

198 return self.rc_objects.inputs 

199 

200 def change_static_dynamic(self, calculate_static: bool | str): 

201 if isinstance(calculate_static, str): 

202 match calculate_static.lower(): 

203 case "static" | "statisch" | "s": 

204 calculate_static = True 

205 case _: 

206 calculate_static = False 

207 self.settings.calculate_static = calculate_static 

208 

209 def get_node_by_id(self, _id: str | int): 

210 """ 

211 Returns the node with the given id. 

212 

213 Parameters 

214 ---------- 

215 _id : str | int 

216 The id of the node. 

217 

218 Returns 

219 ------- 

220 TemperatureNode | InternalHeatSource : 

221 """ 

222 if isinstance(_id, str): 

223 _id = int(_id) 

224 return next((x for x in self.rc_objects.all_equation_objects if x.id == _id), None) 

225 

226 def no_infinity(self, sp_matrix: SparseMatrix | spmatrix | sparray) -> bool: 

227 """ 

228 Checks the given sparse matrix for infinity entries and symbolic singularities. 

229 

230 Notes 

231 ----- 

232 Saves a dict, which maps each free sympy symbol to a set of values at which at least one symbolic entry 

233 diverges. Empty if no symbolic entries exist. 

234 

235 Parameters 

236 ---------- 

237 sp_matrix : SparseMatrix | spmatrix | sparray 

238 A sparse matrix/vector to check, may contain numeric values or sympy 

239 expressions. 

240 

241 Returns 

242 ------- 

243 tuple[bool, dict] : 

244 bool : True if no numeric infinity was found. 

245 """ 

246 sp_matrix: SparseMatrix | spmatrix | sparray 

247 if isinstance(sp_matrix, SparseMatrix) or isinstance(sp_matrix, ImmutableSparseMatrix): 

248 entries = sp_matrix.todok().values() 

249 else: 

250 sp_matrix: spmatrix | sparray 

251 entries = sp_matrix.data 

252 

253 for entry in entries: 

254 if isinstance(entry, Basic): 

255 if not entry.free_symbols: 

256 if entry in (sym.oo, -sym.oo, sym.zoo, sym.nan): 

257 return False 

258 else: 

259 _, denominator = sym.fraction(sym.cancel(entry)) 

260 if denominator != sym.Integer(1): 

261 for s in denominator.free_symbols: 

262 solution = sym.solve(denominator, s) 

263 self.forbidden_values.setdefault(s, set()).update(solution) 

264 else: 

265 if np.isinf(entry): 

266 return False 

267 

268 return True 

269 

270 def _get_matrix_symbol(self, key: str) -> SparseMatrix | ImmutableSparseMatrix: 

271 """ 

272 Returns the desired matrix in symbolic notation. 

273 

274 Parameters 

275 ---------- 

276 key : str 

277 Either "symbol" or "input" to determine the matrix type. 

278 

279 Returns 

280 ------- 

281 SparseMatrix | ImmutableSparseMatrix : 

282 The (sympy) symbolic notation of the matrix. 

283 """ 

284 attr = f"_{key}_matrix_symbol" 

285 if getattr(self, attr) is None: 

286 self.make_system_matrices() 

287 return getattr(self, attr) 

288 

289 def _build_matrix_function(self, key: str) -> Callable: 

290 """ 

291 Builds the desired matrix lambda function using the symbols matrix, respectively. 

292 

293 Parameters 

294 ---------- 

295 key : str 

296 Either "symbol" or "input" to determine the matrix type. 

297 

298 Returns 

299 ------- 

300 Callable : 

301 The function of the matrix. 

302 """ 

303 matrix_symbol = self._get_matrix_symbol(key) 

304 all_symbols = self.get_symbols() 

305 if self.get_time_dependent_symbols(): 

306 ordered_symbols = [ 

307 next((s for s in matrix_symbol.free_symbols if s.name == symbol.name), symbol) for symbol in all_symbols 

308 ] 

309 return lambdify(ordered_symbols, matrix_symbol, "sympy") 

310 return lambdify(all_symbols, matrix_symbol, "scipy") 

311 

312 def _build_matrix_direct(self, key: str) -> spmatrix: 

313 """ 

314 Bypasses lambdify by substituting all symbol values via name-based xreplace. 

315 

316 Parameters 

317 ---------- 

318 key : str 

319 Either "symbol" or "input" to determine the matrix type. 

320 

321 Notes 

322 ----- 

323 The replace must be done using the name of the symbols and not the objects itself because they might be loaded 

324 from disk and might not match, but the names do. 

325 

326 Returns 

327 ------- 

328 scipy.spmatrix : 

329 A scipy sparse matrix with all values substituted. 

330 """ 

331 matrix_symbol = self._get_matrix_symbol(key) 

332 name_to_value = {s.name: v for s, v in zip(self.get_symbols(), self.get_values())} 

333 free = matrix_symbol.free_symbols 

334 substitution_map = {s: name_to_value[s.name] for s in free if s.name in name_to_value} 

335 

336 dok = matrix_symbol.todok() 

337 rows, cols, data = [], [], [] 

338 for (i, j), expr in dok.items(): 

339 rows.append(i) 

340 cols.append(j) 

341 data.append(float(expr.xreplace(substitution_map))) 

342 

343 return coo_matrix((data, (rows, cols)), shape=matrix_symbol.shape).tocsr() 

344 

345 # import warnings 

346 # import logging 

347 # import pickle 

348 # from concurrent.futures import ProcessPoolExecutor 

349 # from pathlib import Path 

350 # from typing import Callable 

351 # 

352 # import numba 

353 # import numpy as np 

354 # from scipy.sparse import coo_matrix, csr_matrix 

355 # from sympy import lambdify 

356 # from sympy.printing import ccode 

357 # 

358 # def _xreplace_element(args: tuple) -> object: 

359 # """Apply xreplace to a single expression.""" 

360 # expr, static_map = args 

361 # return expr.xreplace(static_map) 

362 # 

363 # def _build_c_source(reduced_exprs: list, td_ordered: list, func_name: str) -> str: 

364 # """ 

365 # Generates a C source file with a single function iterating over all nonzero 

366 # expressions. 

367 # 

368 # Parameters 

369 # ---------- 

370 # reduced_exprs : list 

371 # Sympy expressions with static symbols already substituted. 

372 # td_ordered : list 

373 # Ordered time-dependent sympy symbols. 

374 # func_name : str 

375 # Name of the generated C function. 

376 # 

377 # Returns 

378 # ------- 

379 # str : 

380 # C source code as a string. 

381 # """ 

382 # args = ", ".join(f"double {s.name}" for s in td_ordered) 

383 # lines = [ 

384 # "#include <stddef.h>", 

385 # f"void {func_name}({args}, double *out, size_t n) {{", 

386 # ] 

387 # for i, expr in enumerate(reduced_exprs): 

388 # lines.append(f" out[{i}] = {ccode(expr)};") 

389 # lines.append("}") 

390 # return "\n".join(lines) 

391 # 

392 # def _compile_c_function( 

393 # source: str, 

394 # func_name: str, 

395 # n_out: int, 

396 # cache_path: Path, 

397 # ) -> Callable: 

398 # """ 

399 # Compiles C source to a shared library and returns a callable via ctypes. 

400 # 

401 # Parameters 

402 # ---------- 

403 # source : str 

404 # C source code string. 

405 # func_name : str 

406 # Name of the C function to load. 

407 # n_out : int 

408 # Number of output values (nnz). 

409 # cache_path : Path 

410 # Path to write the compiled .so / .dll. 

411 # 

412 # Returns 

413 # ------- 

414 # Callable : 

415 # Python callable wrapping the compiled C function. 

416 # """ 

417 # import ctypes 

418 # import subprocess 

419 # import tempfile 

420 # 

421 # src_path = cache_path.with_suffix(".c") 

422 # src_path.write_text(source) 

423 # 

424 # result = subprocess.run( 

425 # ["gcc", "-O3", "-shared", "-fPIC", "-o", str(cache_path), str(src_path)], 

426 # capture_output=True, 

427 # text=True, 

428 # ) 

429 # if result.returncode != 0: 

430 # raise RuntimeError(result.stderr) 

431 # 

432 # lib = ctypes.CDLL(str(cache_path)) 

433 # fn = getattr(lib, func_name) 

434 # fn.restype = None 

435 # 

436 # out_type = ctypes.c_double * n_out 

437 # 

438 # def c_callable(*td_vals: float) -> np.ndarray: 

439 # out = out_type() 

440 # fn(*[ctypes.c_double(v) for v in td_vals], out, ctypes.c_size_t(n_out)) 

441 # return np.frombuffer(out, dtype=np.float64).copy() 

442 # 

443 # return c_callable 

444 

445 # def _prepare_matrix_ingredients( 

446 # self, key: str, parallel: bool 

447 # ) -> tuple[list, list, np.ndarray, np.ndarray, tuple, list]: 

448 # """ 

449 # Extracts and reduces nonzero expressions, substituting all static symbols. 

450 # 

451 # Parameters 

452 # ---------- 

453 # key : str 

454 # Either "symbol" or "input" to determine the matrix type. 

455 # parallel : bool 

456 # If True, parallelizes static substitution via ProcessPoolExecutor. 

457 # 

458 # Returns 

459 # ------- 

460 # tuple : 

461 # td_ordered, reduced_exprs, rows, cols, shape, raw exprs (unreduced) 

462 # """ 

463 # matrix_symbol = self._get_matrix_symbol(key) 

464 # all_symbols = self.get_symbols() 

465 # all_values = self.get_values() 

466 # td_symbols = self.get_time_dependent_symbols() 

467 # td_names = {s.name for s in td_symbols} 

468 # 

469 # static_map = { 

470 # s_mat: val 

471 # for sym, val in zip(all_symbols, all_values) 

472 # if sym.name not in td_names 

473 # for s_mat in matrix_symbol.free_symbols 

474 # if s_mat.name == sym.name 

475 # } 

476 # 

477 # td_ordered = [ 

478 # next(s for s in matrix_symbol.free_symbols if s.name == td.name) 

479 # for td in td_symbols 

480 # if any(s.name == td.name for s in matrix_symbol.free_symbols) 

481 # ] 

482 # 

483 # dok = matrix_symbol.todok() 

484 # shape = matrix_symbol.shape 

485 # keys, exprs = zip(*dok.items()) if dok else ([], []) 

486 # rows = np.array([k[0] for k in keys], dtype=np.int32) 

487 # cols = np.array([k[1] for k in keys], dtype=np.int32) 

488 # 

489 # TODO: do not do it like that, it is worse than the solution that already exists. 

490 # if parallel: 

491 # tasks = [(expr, static_map) for expr in exprs] 

492 # with ProcessPoolExecutor() as executor: 

493 # reduced_exprs = list(executor.map(_xreplace_element, tasks)) 

494 # else: 

495 # reduced_exprs = [expr.xreplace(static_map) for expr in exprs] 

496 # 

497 # return td_ordered, reduced_exprs, rows, cols, shape 

498 # 

499 # TODO: This is worse than the current solution, but the other version (numby/c...) should be implemented to speed 

500 # things up. But attention: Currently, it is just vibe coded... 

501 # def _build_matrix_function_lambdify( 

502 # self, key: str, parallel: bool = False 

503 # ) -> Callable: 

504 # """ 

505 # Builds the matrix function using lambdify with numpy backend. 

506 # 

507 # Parameters 

508 # ---------- 

509 # key : str 

510 # Either "symbol" or "input" to determine the matrix type. 

511 # parallel : bool 

512 # If True, parallelizes static symbol substitution. 

513 # 

514 # Returns 

515 # ------- 

516 # Callable : 

517 # Function accepting td symbol values, returning a csr_matrix. 

518 # """ 

519 # td_ordered, reduced_exprs, rows, cols, shape = ( 

520 # self._prepare_matrix_ingredients(key, parallel) 

521 # ) 

522 # 

523 # if not len(rows): 

524 # empty = csr_matrix(shape, dtype=float) 

525 # 

526 # def assembled(*_) -> csr_matrix: 

527 # return empty 

528 # 

529 # return assembled 

530 # 

531 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

532 # template = coo_matrix( 

533 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

534 # ).tocsr() 

535 # 

536 # def assembled(*td_vals: float) -> csr_matrix: 

537 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

538 # return template 

539 # 

540 # return assembled 

541 # 

542 # def _build_matrix_function_numba(self, key: str, parallel: bool = False) -> Callable: 

543 # """ 

544 # Builds the matrix function using a numba-JIT compiled callable, with 

545 # caching to self.save_folder_path keyed by self.hash. 

546 # 

547 # Parameters 

548 # ---------- 

549 # key : str 

550 # Either "symbol" or "input" to determine the matrix type. 

551 # parallel : bool 

552 # If True, parallelizes static symbol substitution. 

553 # 

554 # Returns 

555 # ------- 

556 # Callable : 

557 # Function accepting td symbol values, returning a csr_matrix. 

558 # """ 

559 # td_ordered, reduced_exprs, rows, cols, shape = ( 

560 # self._prepare_matrix_ingredients(key, parallel) 

561 # ) 

562 # 

563 # if not len(rows): 

564 # empty = csr_matrix(shape, dtype=float) 

565 # 

566 # def assembled(*_) -> csr_matrix: 

567 # return empty 

568 # 

569 # return assembled 

570 # 

571 # cache_path = Path(self.save_folder_path) / f"{self.hash}_{key}_numba.pkl" 

572 # 

573 # if cache_path.exists(): 

574 # with open(cache_path, "rb") as f: 

575 # data_func = pickle.load(f) 

576 # else: 

577 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

578 # data_func_nb = numba.njit(data_func, cache=True) 

579 # with open(cache_path, "wb") as f: 

580 # pickle.dump(data_func_nb, f) 

581 # data_func = data_func_nb 

582 # 

583 # template = coo_matrix( 

584 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

585 # ).tocsr() 

586 # 

587 # def assembled(*td_vals: float) -> csr_matrix: 

588 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

589 # return template 

590 # 

591 # return assembled 

592 # 

593 # def _build_matrix_function_c(self, key: str, parallel: bool = False) -> Callable: 

594 # """ 

595 # Builds the matrix function using a compiled C shared library via ctypes, 

596 # falling back to numba if compilation fails. 

597 # 

598 # Parameters 

599 # ---------- 

600 # key : str 

601 # Either "symbol" or "input" to determine the matrix type. 

602 # parallel : bool 

603 # If True, parallelizes static symbol substitution. 

604 # 

605 # Returns 

606 # ------- 

607 # Callable : 

608 # Function accepting td symbol values, returning a csr_matrix. 

609 # """ 

610 # td_ordered, reduced_exprs, rows, cols, shape = ( 

611 # self._prepare_matrix_ingredients(key, parallel) 

612 # ) 

613 # 

614 # if not len(rows): 

615 # empty = csr_matrix(shape, dtype=float) 

616 # 

617 # def assembled(*_) -> csr_matrix: 

618 # return empty 

619 # 

620 # return assembled 

621 # 

622 # func_name = f"matrix_func_{self.hash}_{key}" 

623 # so_path = Path(self.save_folder_path) / f"{func_name}.so" 

624 # 

625 # try: 

626 # if so_path.exists(): 

627 # import ctypes 

628 # lib = ctypes.CDLL(str(so_path)) 

629 # fn = getattr(lib, func_name) 

630 # n_out = len(rows) 

631 # out_type = ctypes.c_double * n_out 

632 # 

633 # def data_func(*td_vals: float) -> np.ndarray: 

634 # out = out_type() 

635 # fn( 

636 # *[ctypes.c_double(v) for v in td_vals], 

637 # out, 

638 # ctypes.c_size_t(n_out), 

639 # ) 

640 # return np.frombuffer(out, dtype=np.float64).copy() 

641 # else: 

642 # source = _build_c_source(reduced_exprs, td_ordered, func_name) 

643 # data_func = _compile_c_function( 

644 # source, func_name, len(rows), so_path 

645 # ) 

646 # except Exception as e: 

647 # print( 

648 # f"C compilation failed for '{key}' matrix ({e}), falling back to numba." 

649 # ) 

650 # return self._build_matrix_function_numba(key, parallel=parallel) 

651 # 

652 # template = coo_matrix( 

653 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

654 # ).tocsr() 

655 # 

656 # def assembled(*td_vals: float) -> csr_matrix: 

657 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

658 # return template 

659 # 

660 # return assembled 

661 # 

662 # def _build_matrix_function( 

663 # self, key: str, strategy: str | None = None 

664 # ) -> Callable: 

665 # """ 

666 # Dispatcher selecting the matrix function build strategy. 

667 # 

668 # Parameters 

669 # ---------- 

670 # key : str 

671 # Either "symbol" or "input" to determine the matrix type. 

672 # strategy : str | None 

673 # One of "lambdify", "parallel", "numba", "c". If None, falls back 

674 # to self.strategy set at __init__. 

675 # 

676 # Returns 

677 # ------- 

678 # Callable : 

679 # Function accepting td symbol values, returning a csr_matrix. 

680 # """ 

681 # s = strategy if strategy is not None else self.strategy 

682 # if s == "lambdify": 

683 # return self._build_matrix_function_lambdify(key, parallel=False) 

684 # if s == "parallel": 

685 # return self._build_matrix_function_lambdify(key, parallel=True) 

686 # if s == "numba": 

687 # return self._build_matrix_function_numba(key, parallel=False) 

688 # if s == "c": 

689 # return self._build_matrix_function_c(key, parallel=False) 

690 # raise ValueError(f"Unknown strategy '{s}'. Choose from: lambdify, parallel, numba, c.") 

691 # 

692 # def _build_matrix( 

693 # self, key: str, strategy: str | None = None 

694 # ) -> csr_matrix: 

695 # """ 

696 # Inputs all values into the desired matrix function. 

697 # 

698 # Parameters 

699 # ---------- 

700 # key : str 

701 # Either "symbol" or "input" to determine the matrix type. 

702 # strategy : str | None 

703 # Overrides self.strategy if provided. 

704 # 

705 # Returns 

706 # ------- 

707 # csr_matrix : 

708 # The desired matrix. 

709 # """ 

710 # attr = f"_{key}_matrix" 

711 # if getattr(self, attr) is None: 

712 # if not self.get_time_dependent_symbols(): 

713 # result = self._build_matrix_direct(key) 

714 # else: 

715 # result = self._build_matrix_function(key, strategy=strategy)( 

716 # *self.get_values() 

717 # ) 

718 # assert self.no_infinity(result), ( 

719 # f"Infinity detected in {key} matrix. System cannot be solved." 

720 # ) 

721 # setattr(self, attr, result) 

722 # return getattr(self, attr) 

723 

724 def _build_matrix(self, key: str) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

725 """ 

726 Inputs all values (can be symbolic values) into the desired matrix function. 

727 

728 Parameters 

729 ---------- 

730 key : str 

731 Either "symbol" or "input" to determine the matrix type. 

732 

733 Returns 

734 ------- 

735 SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray : 

736 The desired matrix. If no time dependent variables are used, it is a scipy sparse matrix, otherwise it's a 

737 sympy SparseMatrix. 

738 """ 

739 attr = f"_{key}_matrix" 

740 if getattr(self, attr) is None: 

741 if self.get_time_dependent_symbols(): 

742 result = self._build_matrix_function(key)(*self.get_values()) 

743 else: 

744 result = self._build_matrix_direct(key) 

745 assert self.no_infinity(result), f"Infinity detected in {key} matrix. System cannot be solved." 

746 setattr(self, attr, result) 

747 return getattr(self, attr) 

748 

749 @property 

750 def system_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

751 return self._get_matrix_symbol("system") 

752 

753 @property 

754 def input_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

755 return self._get_matrix_symbol("input") 

756 

757 @property 

758 def system_matrix_function(self) -> Callable: 

759 return self._build_matrix_function("system") 

760 

761 @property 

762 def input_matrix_function(self) -> Callable: 

763 return self._build_matrix_function("input") 

764 

765 @property 

766 def system_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

767 return self._build_matrix("system") 

768 

769 @property 

770 def input_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

771 return self._build_matrix("input") 

772 

773 @property 

774 def temperature_vector_symbol(self) -> list: 

775 if self._temperature_vector_symbol is None: 

776 result = [node.temperature_symbol for node in self.nodes] 

777 self._temperature_vector_symbol = result 

778 return self._temperature_vector_symbol 

779 

780 @property 

781 def temperature_vector(self) -> np.ndarray: 

782 result = [node.temperature for node in self.nodes] 

783 return np.array(result).flatten() 

784 

785 @property 

786 def input_vector(self) -> np.ndarray: 

787 result = [n for input_item in self.inputs for n in input_item.values] 

788 return np.array(result).flatten() 

789 

790 @property 

791 def input_vector_symbol(self) -> np.ndarray: 

792 if self._input_vector_symbol is None: 

793 result = [] 

794 if self.inputs is not None and self.inputs != []: 

795 result = [n for input_item in self.inputs for n in input_item.symbols] 

796 self._input_vector_symbol = np.array(result).flatten() 

797 return self._input_vector_symbol 

798 

799 # @property 

800 # def seconds_of_day(self): 

801 # """ 

802 # Returns the seconds that have already passed on the day of ``self.start_time``. 

803 # 

804 # Returns 

805 # ------- 

806 # float | int : 

807 # The passed seconds of the day. 

808 # """ 

809 # return (self.settings.start_date - self.settings.start_date.astype("datetime64[D]")) / np.timedelta64(1, "s") 

810 

811 @property 

812 def save_folder_path(self): 

813 if self.settings.save_folder_path is None: 

814 return run_folder 

815 return self.settings.save_folder_path 

816 

817 @property 

818 def pickle_path(self) -> str: 

819 filename = f"{self.hash}.pickle" 

820 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

821 

822 def pickle_path_result(self, t_span, name_add_on=None): 

823 name_add_on = add_leading_underscore(name_add_on) 

824 filename = f"{self.hash}{name_add_on}_{int(t_span[1])}_result.pickle" 

825 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

826 

827 @property 

828 def pickle_path_single_solution(self) -> str: 

829 filename = f"{self.hash}_single_solution.pickle" 

830 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

831 

832 def pickle_path_solution(self, t_span, name_add_on=None) -> str: 

833 name_add_on = add_leading_underscore(name_add_on) 

834 filename = f"{self.hash}{name_add_on}_{int(t_span[0])}_{int(t_span[1])}_solution.pickle" 

835 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

836 

837 def save_matrices(self): 

838 if self.save_to_pickle: 

839 file_path = self.pickle_path 

840 matrices = [ 

841 self._system_matrix_symbol, 

842 self._input_matrix_symbol, 

843 self._input_vector_symbol, 

844 self._temperature_vector_symbol, 

845 self.forbidden_values, 

846 ] 

847 with open(file_path, "wb") as f: 

848 pickle.dump(matrices, f) 

849 

850 def load_matrices(self): 

851 file_path = self.pickle_path 

852 if os.path.exists(file_path): 

853 with open(file_path, "rb") as f: 

854 matrices = pickle.load(f) 

855 self._system_matrix_symbol = matrices[0] 

856 self._input_matrix_symbol = matrices[1] 

857 self._input_vector_symbol = matrices[2] 

858 self._temperature_vector_symbol = matrices[3] 

859 self.forbidden_values = matrices[4] 

860 print("matrices loaded") 

861 return True 

862 else: 

863 print("No network found nor loaded.") 

864 return False 

865 

866 def save_last_step_solution(self, pickle_path_single_solution: str = None): 

867 """ 

868 Saves the last time step solution. 

869 """ 

870 if pickle_path_single_solution is None: 

871 pickle_path_single_solution = self.pickle_path_single_solution 

872 file_path = pickle_path_single_solution 

873 self.rc_solution.save_last_step(file_path) 

874 

875 def load_initial_values(self, return_bool: bool = False, pickle_path_single_solution: str = None) -> None | bool: 

876 """ 

877 Loads the last time step solution (temperature vector) and sets it as initial values. 

878 """ 

879 if pickle_path_single_solution is None: 

880 pickle_path_single_solution = self.pickle_path_single_solution 

881 file_path = pickle_path_single_solution 

882 if os.path.exists(file_path): 

883 with open(file_path, "rb") as f: 

884 solution_tuple = pickle.load(f) 

885 temp_vector = solution_tuple[0] 

886 input_vector = solution_tuple[1] 

887 for node, temp in zip(self.rc_objects.nodes, temp_vector): 

888 node.initial_temperature = temp 

889 for item, value in zip(self.inputs, input_vector): 

890 item.initial_value = value 

891 if return_bool: 

892 return True 

893 else: 

894 print("No temperature vector found nor loaded.") 

895 if return_bool: 

896 return False 

897 return None 

898 

899 @property 

900 def inhomogeneous_system(self) -> bool: 

901 """ 

902 Returns True if inputs are existing and the network is inhomogeneous. False otherwise. 

903 

904 Returns 

905 ------- 

906 bool : 

907 True if inputs are existing and the network is inhomogeneous. False otherwise. 

908 """ 

909 if self.rc_objects.inputs is None or self.rc_objects.inputs == []: 

910 return False 

911 return True 

912 

913 def determine_max_step(self) -> float: 

914 max_step = 1 

915 if self.rc_objects.mass_flow_nodes is not None or self.rc_objects.mass_flow_nodes != []: 

916 loop_counter = 0 

917 while loop_counter < 1000: 

918 try: 

919 self.check_courant(max_step) # approximated maximum time step used during iterations. 

920 # if no error 

921 break 

922 except HighCourantNumberError: 

923 loop_counter += 1 

924 max_step *= 0.99 

925 if loop_counter >= 1000: 

926 raise HighCourantNumberError 

927 return max_step 

928 

929 def solve_stationary(self, time_step: float | int = 0): 

930 """ 

931 Determines the analytic solution of the network (if it exists). It works for symbolic and numeric matrices. 

932 

933 Parameters 

934 ---------- 

935 time_step : float | int, optional 

936 The time step as which the solution is saved in the rc_solution object. 

937 This parameter has no effect if the network is symbolic. 

938 

939 Returns 

940 ------- 

941 sp.Matrix | MutableDenseMatrix or None: 

942 Stationary symbolic temperature vector of shape (n, 1) (only if result is symbolic, otherwise nothing is 

943 returned and the result is written to the rc_solution object using time_step). 

944 """ 

945 if self.inhomogeneous_system: 

946 result = solve_stationary( 

947 system_matrix=self.system_matrix, 

948 input_matrix=self.input_matrix, 

949 input_vector=self.input_vector, 

950 ) 

951 else: 

952 result = solve_stationary( 

953 system_matrix=self.system_matrix, 

954 ) 

955 

956 if isinstance(result, Basic) and result.free_symbols is not None: 

957 # return symbolic result 

958 return result 

959 else: 

960 # save result in rc_solution 

961 self.rc_solution.add_to_solution([np.array(time_step).reshape(-1,)], [np.array(result).reshape(-1, 1)]) 

962 

963 def solve_network( 

964 self, 

965 t_span: tuple | Any = None, 

966 print_progress=False, 

967 name_add_on: str = "", 

968 check_courant: bool = True, 

969 time_dependent_function: Callable = None, 

970 hook_function: Callable = None, 

971 expected_solution_size_mb=5000, 

972 **kwargs: dict[str, int | Any], 

973 ) -> None: 

974 """ 

975 Solves the network at the given times. 

976 

977 If time_vector is None, one second is calculated. 

978 

979 Parameters 

980 ---------- 

981 t_span : tuple | Any 

982 Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf. 

983 Both t0 and tf must be floats or values interpretable by the float conversion function. 

984 print_progress : bool, optional 

985 If True, some print-outs are made during solving to get the time step that is currently simulated. 

986 name_add_on : str, optional 

987 Optional add-on to the name of in-between saves that is placed after the hash value (separated by "_"). 

988 Example save name: 

989 42395d3d9f07f06ce9c9cb609b406aa54fc2dc5e8c4d9cc75987d9a02541ad2f_nameAddOn_zw_000001080_h.pickle 

990 check_courant : bool, optional 

991 If True, self.check_courant(0.4) is run before simulating. 

992 time_dependent_function : Callable 

993 A function returning the values for all time dependent symbols that are in the matrices in the same order as 

994 self.get_time_dependent_symbols(). 

995 This function is required if time dependent symbols exist. 

996 It must return an iterable (e.g. list) 

997 hook_function : Callable, optional 

998 A function that is run before the network is solved using solve_ivp. 

999 Can be used to catch the time before the solving actually starts (e.g. used in `PerformanceTest`\\). 

1000 expected_solution_size_mb : int | float, optional 

1001 The expected solution size in Megabytes. Is used for RAM-Management in ``SystemHandler.solve()``\\. 

1002 The solution size depends on the size of the RC network and number of saved time steps. 

1003 kwargs : dict[str: int | Any], optional 

1004 Passed to SystemHandler. 

1005 """ 

1006 if self.get_time_dependent_symbols(): 

1007 assert time_dependent_function 

1008 

1009 continued_simulation = False 

1010 if self.load_solution or self.continue_canceled: 

1011 success = self.rc_solution.load_solution( 

1012 self.pickle_path_solution(t_span, name_add_on), last_time_step=t_span[-1] 

1013 ) 

1014 if not success: 

1015 success = self.rc_solution.load_solution( 

1016 self.pickle_path_result(t_span, name_add_on), last_time_step=t_span[-1] 

1017 ) 

1018 if success or (not isinstance(success, bool) and success == 0): # ==0 because 0 could be the last time 

1019 # step (will never be the case.... -.-) 

1020 if not isinstance(success, bool): 

1021 if not self.continue_canceled: 

1022 self.rc_solution.delete_solution_except_first() 

1023 print(f"Partial solution detected but not used because continue_canceled is False.") 

1024 else: 

1025 print( 

1026 f"Simulation is started at loaded time step {success} on {success / t_span[-1] * 100:.2f} %." 

1027 ) 

1028 t_span = (success, t_span[-1]) 

1029 continued_simulation = True 

1030 else: 

1031 print("Solution was loaded from file.") 

1032 return None 

1033 

1034 if check_courant: 

1035 new_max_step = self.determine_max_step() 

1036 if self.settings.solve_settings.max_step > new_max_step: 

1037 print( 

1038 f"Because of Courant Number: New maximum step is set to {new_max_step} (previous:" 

1039 f" {self.settings.solve_settings.max_step})" 

1040 ) 

1041 self.settings.solve_settings.max_step = new_max_step 

1042 

1043 temperature_vector = np.array(self.temperature_vector).flatten() 

1044 system_matrix = self.system_matrix 

1045 time_symbols = self.get_time_dependent_symbols() 

1046 if time_symbols: 

1047 system_matrix = SparseSymbolicEvaluator(system_matrix, time_symbols) 

1048 

1049 system_handler_kwargs = { 

1050 "system_matrix": system_matrix, 

1051 "rc_solution": self.rc_solution, 

1052 "settings": self.settings, 

1053 "print_progress": print_progress, 

1054 "print_points": np.linspace(t_span[0], t_span[-1], 20 + 1), 

1055 "batch_end": t_span[-1], 

1056 "time_dependent_function": time_dependent_function, 

1057 } 

1058 

1059 # create the time_steps to solve the results (every X seconds a value) 

1060 t_eval = np.linspace( 

1061 t_span[0], t_span[1], max(1, int((t_span[1] - t_span[0]) / self.settings.save_all_x_seconds) + 1) 

1062 ) 

1063 

1064 if self.inhomogeneous_system: 

1065 input_matrix = self.input_matrix 

1066 if time_symbols: 

1067 input_matrix = SparseSymbolicEvaluator(input_matrix, time_symbols) 

1068 input_vector = self.input_vector 

1069 

1070 # Each Input can be called as a function to calculate new boundary values during simulation: 

1071 functions_list = self.rc_objects.inputs 

1072 # Also each Input has some functions that can be called independently of the object that return values for 

1073 # the calculation. 

1074 kwargs_functions = {} 

1075 for item in self.rc_objects.inputs: 

1076 kwargs_functions.update(item.get_kwargs_functions()) # Values should be overwritten so that the same 

1077 # values are calculated only once 

1078 

1079 system_handler_kwargs.update( 

1080 { 

1081 "input_matrix": input_matrix, 

1082 "input_vector": input_vector, 

1083 "functions_list": functions_list, 

1084 "kwargs_functions": kwargs_functions, 

1085 "t_eval": t_eval, 

1086 "first_time": t_span[0], 

1087 } 

1088 ) 

1089 

1090 # only update the keys that are in the predefined dictionary (it would also work if everything is passed anyway) 

1091 system_handler_kwargs.update({k: kwargs[k] for k in system_handler_kwargs if k in kwargs}) 

1092 

1093 system_handler = self.system_handler_type(**system_handler_kwargs) 

1094 save_prefix = f"{self.hash}" 

1095 if name_add_on: 

1096 save_prefix += f"_{name_add_on.removeprefix('_').removesuffix('_')}" 

1097 solve_handler = SolveIVPHandler( 

1098 system_handler=system_handler, 

1099 save_prefix=save_prefix, 

1100 solve_settings=self.settings.solve_settings, 

1101 ) 

1102 if hook_function is not None: 

1103 hook_function() 

1104 solve_handler.solve( 

1105 t_span=t_span, 

1106 y0=temperature_vector, 

1107 t_eval=t_eval, 

1108 continued_simulation=continued_simulation, 

1109 expected_solution_size_mb=expected_solution_size_mb, 

1110 ) 

1111 return None 

1112 

1113 @property 

1114 def system_handler_type(self): 

1115 """ 

1116 Returns the SystemHandler type depending on which system (in/homogeneous) is needed. 

1117 

1118 Returns 

1119 ------- 

1120 type : 

1121 The system handler type. 

1122 """ 

1123 if self.inhomogeneous_system: 

1124 return InhomogeneousSystemHandler 

1125 return HomogeneousSystemHandler 

1126 

1127 @property 

1128 def all_objects(self) -> list: 

1129 return [*(self.rc_objects.resistors or []), *(self.rc_objects.nodes or []), *(self.rc_objects.inputs or [])] 

1130 

1131 @property 

1132 def objects_with_identical_symbols(self) -> list: 

1133 """ 

1134 Get all objects with symbols that are relevant for the system matrix. 

1135 

1136 You cannot just use every object, because some (e.g. Resistors) can be summarized in local groups with one 

1137 symbol representing the whole group, like the equivalent resistance of multiple connected Resistors. 

1138 

1139 No `InternalHeatSource` s are returned, because their symbol is already represented in the corresponding Node. 

1140 

1141 Returns 

1142 ------- 

1143 list : 

1144 All network objects without the ones that are already taken into account by the symbols of another. 

1145 The list has the same order every time it is created. 

1146 """ 

1147 

1148 def filter_resistors(resistor_list): 

1149 # Create set of all resistors for O(1) lookups 

1150 all_resistors = {obj for obj in resistor_list if isinstance(obj, Resistor)} 

1151 to_exclude = set() 

1152 

1153 # Collect all resistors to exclude in order of appearance 

1154 for obj in resistor_list: 

1155 if isinstance(obj, Resistor) and obj not in to_exclude: 

1156 inbetween = set(obj.all_resistors_inbetween) - {obj} 

1157 to_exclude.update(inbetween & all_resistors) 

1158 

1159 # Filter maintaining original order 

1160 return [obj for obj in resistor_list if not obj in to_exclude] 

1161 

1162 result = [rc_obj for rc_obj in self.all_objects if not isinstance(rc_obj, InternalHeatSource)] 

1163 return filter_resistors(result) 

1164 

1165 def get_symbols(self): 

1166 """ 

1167 Returns a list with all symbols, except time dependent symbols. 

1168 

1169 Returns 

1170 ------- 

1171 list : 

1172 All symbols, except time dependent symbols. 

1173 """ 

1174 all_symbols = [symb for rc_object in self.objects_with_identical_symbols for symb in rc_object.symbols] 

1175 return all_symbols 

1176 

1177 def get_values(self): 

1178 """ 

1179 Returns a list with all values, except of time dependent symbols. 

1180 

1181 Returns 

1182 ------- 

1183 list : 

1184 All values, except of time dependent symbols. 

1185 """ 

1186 all_values = [ 

1187 val if isinstance(val, Basic) else np.float64(val) 

1188 for rc_object in self.objects_with_identical_symbols 

1189 for val in rc_object.values 

1190 ] 

1191 return all_values 

1192 

1193 def get_time_dependent_symbols(self) -> list: 

1194 """ 

1195 A list with all time dependent symbols but each of it is only added once. 

1196 

1197 Returns 

1198 ------- 

1199 list : 

1200 The time dependent symbols. 

1201 """ 

1202 symbols = [] 

1203 seen = set() 

1204 for rc_object in self.objects_with_identical_symbols: 

1205 for symb in rc_object.time_dependent_symbols: 

1206 if symb not in seen: 

1207 seen.add(symb) 

1208 symbols.append(symb) 

1209 return symbols 

1210 

1211 def get_symbols_and_values(self) -> tuple[list, list]: 

1212 """ 

1213 Return two lists of first: all symbols and second: all associated values of all rc objects. 

1214 

1215 Returns 

1216 ------- 

1217 tuple[list, list] : 

1218 

1219 """ 

1220 return self.get_symbols(), self.get_values() 

1221 

1222 def make_system_matrices(self): 

1223 """ 

1224 Creates the Jacobian matrices of the RC Network. 

1225 

1226 If self.load_from_pickle the whole network will be loaded from pickle file if it exists to prevent heavy 

1227 calculations. 

1228 

1229 Returns 

1230 ------- 

1231 sympy expression : 

1232 The system matrix as sympy expression. 

1233 """ 

1234 success = False 

1235 if self.load_from_pickle: 

1236 try: 

1237 success = self.load_matrices() 

1238 except Exception: 

1239 pass 

1240 if not success: 

1241 # fill system matrix 

1242 terms = [] 

1243 involved_symbols: list[set] = [] 

1244 temperature_symbols: list[set] = [] 

1245 variables = [] 

1246 for node in self.nodes: 

1247 # works only for one term 

1248 term, t_symbols, all_symbols = node.temperature_derivative_term() 

1249 terms.append(term) 

1250 involved_symbols.append(all_symbols) 

1251 temperature_symbols.append(t_symbols) 

1252 variables.append(node.temperature_symbol) 

1253 

1254 self._system_matrix_symbol = build_jacobian( 

1255 terms, variables, temperature_symbols, num_cores=self.num_cores_jacobian 

1256 ) 

1257 print("system matrix created") 

1258 

1259 self.save_matrices() 

1260 

1261 input_variables = list(self.input_vector_symbol) 

1262 self._input_matrix_symbol = build_jacobian( 

1263 terms, input_variables, involved_symbols, num_cores=self.num_cores_jacobian 

1264 ) 

1265 print("input matrix created") 

1266 

1267 self.save_matrices() 

1268 

1269 def check_courant(self, time_step: float): 

1270 """ 

1271 Calculates the Courant number for the whole network and raises an error if its larger than 1. 

1272 

1273 Parameters 

1274 ---------- 

1275 time_step : float 

1276 The maximum time step in seconds. 

1277 

1278 Raises 

1279 ------ 

1280 HighCourantNumberError : 

1281 If the courant number is greater than 1 the network will calculate shit. 

1282 

1283 """ 

1284 # TODO: Do not use the courant number but return a critical maximum time_step to easy raise an error during 

1285 # iterations. So: no iteration, but calculate the maximum time_step and return this so that it can be set. 

1286 mass_flow_nodes = self.rc_objects.mass_flow_nodes 

1287 mass_flow_node: MassFlowNode 

1288 for mass_flow_node in mass_flow_nodes: 

1289 if mass_flow_node.courant_number(time_step) > 1: 

1290 raise HighCourantNumberError 

1291 

1292 def matrix_to_latex_diag(self): 

1293 matrix = self.system_matrix_symbol 

1294 diag_elements = matrix.diagonal() 

1295 matrix_no_diag = matrix - diag(*diag_elements) 

1296 

1297 matrix_latex = latex(matrix_no_diag) 

1298 diag_latex = latex(Matrix(diag_elements).reshape(len(diag_elements), 1)) 

1299 

1300 return f"{matrix_latex} + \\mathrm{{diag}}\\left({diag_latex}\\right)"