Coverage for pyrc \ core \ network.py: 70%

417 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-29 14:14 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8from __future__ import annotations 

9import os 

10import pickle 

11from abc import abstractmethod 

12from copy import copy 

13from typing import Any, TYPE_CHECKING, Callable 

14 

15import numpy as np 

16from sympy import lambdify, latex, SparseMatrix, Matrix, diag, Symbol, Basic, ImmutableSparseMatrix, MutableSparseMatrix 

17import sympy as sym 

18import hashlib 

19import networkx as nx 

20from scipy.sparse import coo_matrix 

21 

22from pyrc.core.solver.stationary import solve_stationary 

23from pyrc.paths import run_folder 

24from pyrc.core.components.resistor import Resistor 

25from pyrc.core.inputs import InternalHeatSource 

26from pyrc.core.settings import initial_settings, Settings 

27from pyrc.core.solver.symbolic import SparseSymbolicEvaluator 

28from pyrc.core.solver.handler import InhomogeneousSystemHandler, SolveIVPHandler, HomogeneousSystemHandler 

29from pyrc.tools.errors import HighCourantNumberError 

30from pyrc.tools.functions import add_leading_underscore 

31 

32from pyrc.tools.science import build_jacobian 

33from pyrc.core.components.templates import RCObjects, RCSolution, EquationItem, initial_rc_objects, Cell 

34from pyrc.core.components.templates import solution_object 

35from pyrc.core.components.capacitor import Capacitor 

36 

37if TYPE_CHECKING: 

38 from pyrc.core.nodes import MassFlowNode 

39 from pyrc.core.components.templates import EquationItemInput 

40 from scipy.sparse import spmatrix, sparray 

41 

42 

43class RCNetwork: 

44 def __init__( 

45 self, 

46 save_to_pickle: bool = True, 

47 load_from_pickle: bool = True, 

48 load_solution: bool = True, 

49 num_cores_jacobian: int = 1, # TODO: Parallelization only works on linux currently (because it forks instead 

50 # of spawning it). Problem is: Sympy symbols are not picklable. 

51 settings: Settings = initial_settings, 

52 rc_objects: RCObjects = initial_rc_objects, 

53 rc_solution: RCSolution = solution_object, 

54 # strategy: str = "lambdify", 

55 continue_canceled: bool = True, 

56 ): 

57 """ 

58 

59 Parameters 

60 ---------- 

61 save_to_pickle 

62 load_from_pickle 

63 load_solution 

64 num_cores_jacobian 

65 settings 

66 rc_objects 

67 rc_solution 

68 continue_canceled : bool, optional 

69 If True, continue canceled simulations. 

70 Overwrites load_solution 

71 """ 

72 self.rc_objects: RCObjects = rc_objects 

73 self.settings: Settings = settings 

74 self._system_matrix_symbol: MutableSparseMatrix = None 

75 self._input_matrix_symbol: MutableSparseMatrix = None 

76 self._temperature_vector_symbol = None 

77 self._input_vector_symbol = None 

78 self._input_matrix = None 

79 self._system_matrix = None 

80 self.__hash = None 

81 self.rc_solution: RCSolution = rc_solution 

82 # link rc objects to rc solution 

83 self.rc_solution.rc_objects = rc_objects 

84 # self.strategy = strategy 

85 

86 # For each time dependent symbol the following values are not allowed because they would make an entry in the 

87 # system matrix be infinity 

88 self.forbidden_values: dict[Symbol, set] = {} 

89 

90 self.save_to_pickle: bool = save_to_pickle 

91 self.load_from_pickle: bool = load_from_pickle 

92 self.load_solution: bool = load_solution 

93 self.continue_canceled: bool = continue_canceled 

94 

95 if num_cores_jacobian is None: 

96 num_cores_jacobian = os.cpu_count() 

97 self.num_cores_jacobian = min(max(num_cores_jacobian, 1), os.cpu_count()) 

98 

99 self.groups: dict[str, Capacitor] = {} 

100 

101 def __getattr__(self, item): 

102 if getattr(self.rc_objects, item, None): 

103 return self.rc_objects.__getattribute__(item) 

104 return AttributeError 

105 

106 def create_network(self, *args, **kwargs): 

107 EquationItem.reset_counter() 

108 Resistor.reset_counter() 

109 self.rc_objects.wipe_all() 

110 self._create_network(*args, **kwargs) 

111 assert self.nodes is not None 

112 

113 @abstractmethod 

114 def _create_network(self, *args, **kwargs): 

115 pass 

116 

117 def copy_dict(self): 

118 return { 

119 "save_to_pickle": self.save_to_pickle, 

120 "load_from_pickle": self.load_from_pickle, 

121 "load_solution": self.load_solution, 

122 "num_cores_jacobian": self.num_cores_jacobian, 

123 "settings": copy(self.settings), 

124 } 

125 

126 def __copy__(self): 

127 return RCNetwork(**self.copy_dict()) 

128 

129 @property 

130 def nodes(self): 

131 return self.rc_objects.nodes 

132 

133 @property 

134 def hash(self): 

135 """ 

136 Compute a deterministic hash for the :class:`RCNetwork` where node identity (class name + obj.id) and topology matter. 

137 

138 For :class:`Capacitor` instances, an optional internal_heat_source (identified by its id) is encoded as a node attribute. 

139 

140 For :class:`Cell` subclass instances, position and delta arrays are encoded. 

141 

142 Returns 

143 ------- 

144 str : 

145 SHA-256 hex digest of the canonical edge and node representation. 

146 """ 

147 if self.__hash is None: 

148 

149 def node_key(_obj) -> str: 

150 return f"{type(_obj).__name__}:{_obj.id}" 

151 

152 def array_str(arr: np.ndarray) -> str: 

153 return ",".join(f"{v:.17g}" for v in arr.ravel()) 

154 

155 graph = nx.Graph() 

156 for obj in self.rc_objects.all: 

157 attrs = {} 

158 if isinstance(obj, Capacitor): 

159 attrs["heat_source_id"] = ( 

160 obj.internal_heat_source.id if obj.internal_heat_source is not None else None 

161 ) 

162 if isinstance(obj, Cell): 

163 attrs["position"] = array_str(obj.position) 

164 attrs["delta"] = array_str(obj.delta) 

165 graph.add_node(node_key(obj), **attrs) 

166 

167 # loop over all linked/connected network objects 

168 for obj in self.rc_objects.all: 

169 for neighbour in obj.neighbours: 

170 graph.add_edge(node_key(obj), node_key(neighbour)) 

171 

172 node_str = "|".join( 

173 f"{nid}#{data}" 

174 for nid, data in sorted((nid, str(sorted(data.items()))) for nid, data in graph.nodes(data=True)) 

175 ) 

176 edge_str = "|".join(sorted(f"{min(u, v)}-{max(u, v)}" for u, v in graph.edges())) 

177 canonical = f"nodes:{node_str};edges:{edge_str}" 

178 self.__hash = hashlib.sha256(canonical.encode()).hexdigest() 

179 return self.__hash 

180 

181 @property 

182 def time_steps(self) -> list: 

183 """ 

184 Returns a list with all time steps. 

185 

186 Returns 

187 ------- 

188 list : 

189 The time steps. 

190 """ 

191 return self.rc_solution.time_steps 

192 

193 def reset_properties(self): 

194 self._temperature_vector_symbol = None 

195 

196 @property 

197 def inputs(self) -> list[EquationItemInput]: 

198 return self.rc_objects.inputs 

199 

200 def change_static_dynamic(self, calculate_static: bool | str): 

201 if isinstance(calculate_static, str): 

202 match calculate_static.lower(): 

203 case "static" | "statisch" | "s": 

204 calculate_static = True 

205 case _: 

206 calculate_static = False 

207 self.settings.calculate_static = calculate_static 

208 

209 def get_node_by_id(self, _id: str | int): 

210 """ 

211 Returns the node with the given id. 

212 

213 Parameters 

214 ---------- 

215 _id : str | int 

216 The id of the node. 

217 

218 Returns 

219 ------- 

220 TemperatureNode | InternalHeatSource : 

221 """ 

222 if isinstance(_id, str): 

223 _id = int(_id) 

224 return next((x for x in self.rc_objects.all_equation_objects if x.id == _id), None) 

225 

226 def no_infinity(self, sp_matrix: SparseMatrix | spmatrix | sparray) -> bool: 

227 """ 

228 Checks the given sparse matrix for infinity entries and symbolic singularities. 

229 

230 Notes 

231 ----- 

232 Saves a dict, which maps each free sympy symbol to a set of values at which at least one symbolic entry 

233 diverges. Empty if no symbolic entries exist. 

234 

235 Parameters 

236 ---------- 

237 sp_matrix : SparseMatrix | spmatrix | sparray 

238 A sparse matrix/vector to check, may contain numeric values or sympy 

239 expressions. 

240 

241 Returns 

242 ------- 

243 tuple[bool, dict] : 

244 bool : True if no numeric infinity was found. 

245 """ 

246 sp_matrix: SparseMatrix | spmatrix | sparray 

247 if isinstance(sp_matrix, SparseMatrix) or isinstance(sp_matrix, ImmutableSparseMatrix): 

248 entries = sp_matrix.todok().values() 

249 else: 

250 sp_matrix: spmatrix | sparray 

251 entries = sp_matrix.data 

252 

253 for entry in entries: 

254 if isinstance(entry, Basic): 

255 if not entry.free_symbols: 

256 if entry in (sym.oo, -sym.oo, sym.zoo, sym.nan): 

257 return False 

258 else: 

259 _, denominator = sym.fraction(sym.cancel(entry)) 

260 if denominator != sym.Integer(1): 

261 for s in denominator.free_symbols: 

262 solution = sym.solve(denominator, s) 

263 self.forbidden_values.setdefault(s, set()).update(solution) 

264 else: 

265 if np.isinf(entry): 

266 return False 

267 

268 return True 

269 

270 def _get_matrix_symbol(self, key: str) -> SparseMatrix | ImmutableSparseMatrix: 

271 """ 

272 Returns the desired matrix in symbolic notation. 

273 

274 Parameters 

275 ---------- 

276 key : str 

277 Either "symbol" or "input" to determine the matrix type. 

278 

279 Returns 

280 ------- 

281 SparseMatrix | ImmutableSparseMatrix : 

282 The (sympy) symbolic notation of the matrix. 

283 """ 

284 attr = f"_{key}_matrix_symbol" 

285 if getattr(self, attr) is None: 

286 self.make_system_matrices() 

287 return getattr(self, attr) 

288 

289 def _build_matrix_function(self, key: str) -> Callable: 

290 """ 

291 Builds the desired matrix lambda function using the symbols matrix, respectively. 

292 

293 Parameters 

294 ---------- 

295 key : str 

296 Either "symbol" or "input" to determine the matrix type. 

297 

298 Returns 

299 ------- 

300 Callable : 

301 The function of the matrix. 

302 """ 

303 matrix_symbol = self._get_matrix_symbol(key) 

304 all_symbols = self.get_symbols() 

305 if self.get_time_dependent_symbols(): 

306 ordered_symbols = [ 

307 next((s for s in matrix_symbol.free_symbols if s.name == symbol.name), symbol) for symbol in all_symbols 

308 ] 

309 return lambdify(ordered_symbols, matrix_symbol, "sympy") 

310 return lambdify(all_symbols, matrix_symbol, "scipy") 

311 

312 def _build_matrix_direct(self, key: str) -> spmatrix: 

313 """ 

314 Bypasses lambdify by substituting all symbol values via name-based xreplace. 

315 

316 Parameters 

317 ---------- 

318 key : str 

319 Either "symbol" or "input" to determine the matrix type. 

320 

321 Notes 

322 ----- 

323 The replace must be done using the name of the symbols and not the objects itself because they might be loaded 

324 from disk and might not match, but the names do. 

325 

326 Returns 

327 ------- 

328 scipy.spmatrix : 

329 A scipy sparse matrix with all values substituted. 

330 """ 

331 matrix_symbol = self._get_matrix_symbol(key) 

332 name_to_value = {s.name: v for s, v in zip(self.get_symbols(), self.get_values())} 

333 free = matrix_symbol.free_symbols 

334 substitution_map = {s: name_to_value[s.name] for s in free if s.name in name_to_value} 

335 

336 dok = matrix_symbol.todok() 

337 rows, cols, data = [], [], [] 

338 for (i, j), expr in dok.items(): 

339 rows.append(i) 

340 cols.append(j) 

341 data.append(float(expr.xreplace(substitution_map))) 

342 

343 return coo_matrix((data, (rows, cols)), shape=matrix_symbol.shape).tocsr() 

344 

345 # import warnings 

346 # import logging 

347 # import pickle 

348 # from concurrent.futures import ProcessPoolExecutor 

349 # from pathlib import Path 

350 # from typing import Callable 

351 # 

352 # import numba 

353 # import numpy as np 

354 # from scipy.sparse import coo_matrix, csr_matrix 

355 # from sympy import lambdify 

356 # from sympy.printing import ccode 

357 # 

358 # def _xreplace_element(args: tuple) -> object: 

359 # """Apply xreplace to a single expression.""" 

360 # expr, static_map = args 

361 # return expr.xreplace(static_map) 

362 # 

363 # def _build_c_source(reduced_exprs: list, td_ordered: list, func_name: str) -> str: 

364 # """ 

365 # Generates a C source file with a single function iterating over all nonzero 

366 # expressions. 

367 # 

368 # Parameters 

369 # ---------- 

370 # reduced_exprs : list 

371 # Sympy expressions with static symbols already substituted. 

372 # td_ordered : list 

373 # Ordered time-dependent sympy symbols. 

374 # func_name : str 

375 # Name of the generated C function. 

376 # 

377 # Returns 

378 # ------- 

379 # str : 

380 # C source code as a string. 

381 # """ 

382 # args = ", ".join(f"double {s.name}" for s in td_ordered) 

383 # lines = [ 

384 # "#include <stddef.h>", 

385 # f"void {func_name}({args}, double *out, size_t n) {{", 

386 # ] 

387 # for i, expr in enumerate(reduced_exprs): 

388 # lines.append(f" out[{i}] = {ccode(expr)};") 

389 # lines.append("}") 

390 # return "\n".join(lines) 

391 # 

392 # def _compile_c_function( 

393 # source: str, 

394 # func_name: str, 

395 # n_out: int, 

396 # cache_path: Path, 

397 # ) -> Callable: 

398 # """ 

399 # Compiles C source to a shared library and returns a callable via ctypes. 

400 # 

401 # Parameters 

402 # ---------- 

403 # source : str 

404 # C source code string. 

405 # func_name : str 

406 # Name of the C function to load. 

407 # n_out : int 

408 # Number of output values (nnz). 

409 # cache_path : Path 

410 # Path to write the compiled .so / .dll. 

411 # 

412 # Returns 

413 # ------- 

414 # Callable : 

415 # Python callable wrapping the compiled C function. 

416 # """ 

417 # import ctypes 

418 # import subprocess 

419 # import tempfile 

420 # 

421 # src_path = cache_path.with_suffix(".c") 

422 # src_path.write_text(source) 

423 # 

424 # result = subprocess.run( 

425 # ["gcc", "-O3", "-shared", "-fPIC", "-o", str(cache_path), str(src_path)], 

426 # capture_output=True, 

427 # text=True, 

428 # ) 

429 # if result.returncode != 0: 

430 # raise RuntimeError(result.stderr) 

431 # 

432 # lib = ctypes.CDLL(str(cache_path)) 

433 # fn = getattr(lib, func_name) 

434 # fn.restype = None 

435 # 

436 # out_type = ctypes.c_double * n_out 

437 # 

438 # def c_callable(*td_vals: float) -> np.ndarray: 

439 # out = out_type() 

440 # fn(*[ctypes.c_double(v) for v in td_vals], out, ctypes.c_size_t(n_out)) 

441 # return np.frombuffer(out, dtype=np.float64).copy() 

442 # 

443 # return c_callable 

444 

445 # def _prepare_matrix_ingredients( 

446 # self, key: str, parallel: bool 

447 # ) -> tuple[list, list, np.ndarray, np.ndarray, tuple, list]: 

448 # """ 

449 # Extracts and reduces nonzero expressions, substituting all static symbols. 

450 # 

451 # Parameters 

452 # ---------- 

453 # key : str 

454 # Either "symbol" or "input" to determine the matrix type. 

455 # parallel : bool 

456 # If True, parallelizes static substitution via ProcessPoolExecutor. 

457 # 

458 # Returns 

459 # ------- 

460 # tuple : 

461 # td_ordered, reduced_exprs, rows, cols, shape, raw exprs (unreduced) 

462 # """ 

463 # matrix_symbol = self._get_matrix_symbol(key) 

464 # all_symbols = self.get_symbols() 

465 # all_values = self.get_values() 

466 # td_symbols = self.get_time_dependent_symbols() 

467 # td_names = {s.name for s in td_symbols} 

468 # 

469 # static_map = { 

470 # s_mat: val 

471 # for sym, val in zip(all_symbols, all_values) 

472 # if sym.name not in td_names 

473 # for s_mat in matrix_symbol.free_symbols 

474 # if s_mat.name == sym.name 

475 # } 

476 # 

477 # td_ordered = [ 

478 # next(s for s in matrix_symbol.free_symbols if s.name == td.name) 

479 # for td in td_symbols 

480 # if any(s.name == td.name for s in matrix_symbol.free_symbols) 

481 # ] 

482 # 

483 # dok = matrix_symbol.todok() 

484 # shape = matrix_symbol.shape 

485 # keys, exprs = zip(*dok.items()) if dok else ([], []) 

486 # rows = np.array([k[0] for k in keys], dtype=np.int32) 

487 # cols = np.array([k[1] for k in keys], dtype=np.int32) 

488 # 

489 # TODO: do not do it like that, it is worse than the solution that already exists. 

490 # if parallel: 

491 # tasks = [(expr, static_map) for expr in exprs] 

492 # with ProcessPoolExecutor() as executor: 

493 # reduced_exprs = list(executor.map(_xreplace_element, tasks)) 

494 # else: 

495 # reduced_exprs = [expr.xreplace(static_map) for expr in exprs] 

496 # 

497 # return td_ordered, reduced_exprs, rows, cols, shape 

498 # 

499 # TODO: This is worse than the current solution, but the other version (numby/c...) should be implemented to speed 

500 # things up. But attention: Currently, it is just vibe coded... 

501 # def _build_matrix_function_lambdify( 

502 # self, key: str, parallel: bool = False 

503 # ) -> Callable: 

504 # """ 

505 # Builds the matrix function using lambdify with numpy backend. 

506 # 

507 # Parameters 

508 # ---------- 

509 # key : str 

510 # Either "symbol" or "input" to determine the matrix type. 

511 # parallel : bool 

512 # If True, parallelizes static symbol substitution. 

513 # 

514 # Returns 

515 # ------- 

516 # Callable : 

517 # Function accepting td symbol values, returning a csr_matrix. 

518 # """ 

519 # td_ordered, reduced_exprs, rows, cols, shape = ( 

520 # self._prepare_matrix_ingredients(key, parallel) 

521 # ) 

522 # 

523 # if not len(rows): 

524 # empty = csr_matrix(shape, dtype=float) 

525 # 

526 # def assembled(*_) -> csr_matrix: 

527 # return empty 

528 # 

529 # return assembled 

530 # 

531 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

532 # template = coo_matrix( 

533 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

534 # ).tocsr() 

535 # 

536 # def assembled(*td_vals: float) -> csr_matrix: 

537 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

538 # return template 

539 # 

540 # return assembled 

541 # 

542 # def _build_matrix_function_numba(self, key: str, parallel: bool = False) -> Callable: 

543 # """ 

544 # Builds the matrix function using a numba-JIT compiled callable, with 

545 # caching to self.save_folder_path keyed by self.hash. 

546 # 

547 # Parameters 

548 # ---------- 

549 # key : str 

550 # Either "symbol" or "input" to determine the matrix type. 

551 # parallel : bool 

552 # If True, parallelizes static symbol substitution. 

553 # 

554 # Returns 

555 # ------- 

556 # Callable : 

557 # Function accepting td symbol values, returning a csr_matrix. 

558 # """ 

559 # td_ordered, reduced_exprs, rows, cols, shape = ( 

560 # self._prepare_matrix_ingredients(key, parallel) 

561 # ) 

562 # 

563 # if not len(rows): 

564 # empty = csr_matrix(shape, dtype=float) 

565 # 

566 # def assembled(*_) -> csr_matrix: 

567 # return empty 

568 # 

569 # return assembled 

570 # 

571 # cache_path = Path(self.save_folder_path) / f"{self.hash}_{key}_numba.pkl" 

572 # 

573 # if cache_path.exists(): 

574 # with open(cache_path, "rb") as f: 

575 # data_func = pickle.load(f) 

576 # else: 

577 # data_func = lambdify(td_ordered, reduced_exprs, "numpy") 

578 # data_func_nb = numba.njit(data_func, cache=True) 

579 # with open(cache_path, "wb") as f: 

580 # pickle.dump(data_func_nb, f) 

581 # data_func = data_func_nb 

582 # 

583 # template = coo_matrix( 

584 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

585 # ).tocsr() 

586 # 

587 # def assembled(*td_vals: float) -> csr_matrix: 

588 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

589 # return template 

590 # 

591 # return assembled 

592 # 

593 # def _build_matrix_function_c(self, key: str, parallel: bool = False) -> Callable: 

594 # """ 

595 # Builds the matrix function using a compiled C shared library via ctypes, 

596 # falling back to numba if compilation fails. 

597 # 

598 # Parameters 

599 # ---------- 

600 # key : str 

601 # Either "symbol" or "input" to determine the matrix type. 

602 # parallel : bool 

603 # If True, parallelizes static symbol substitution. 

604 # 

605 # Returns 

606 # ------- 

607 # Callable : 

608 # Function accepting td symbol values, returning a csr_matrix. 

609 # """ 

610 # td_ordered, reduced_exprs, rows, cols, shape = ( 

611 # self._prepare_matrix_ingredients(key, parallel) 

612 # ) 

613 # 

614 # if not len(rows): 

615 # empty = csr_matrix(shape, dtype=float) 

616 # 

617 # def assembled(*_) -> csr_matrix: 

618 # return empty 

619 # 

620 # return assembled 

621 # 

622 # func_name = f"matrix_func_{self.hash}_{key}" 

623 # so_path = Path(self.save_folder_path) / f"{func_name}.so" 

624 # 

625 # try: 

626 # if so_path.exists(): 

627 # import ctypes 

628 # lib = ctypes.CDLL(str(so_path)) 

629 # fn = getattr(lib, func_name) 

630 # n_out = len(rows) 

631 # out_type = ctypes.c_double * n_out 

632 # 

633 # def data_func(*td_vals: float) -> np.ndarray: 

634 # out = out_type() 

635 # fn( 

636 # *[ctypes.c_double(v) for v in td_vals], 

637 # out, 

638 # ctypes.c_size_t(n_out), 

639 # ) 

640 # return np.frombuffer(out, dtype=np.float64).copy() 

641 # else: 

642 # source = _build_c_source(reduced_exprs, td_ordered, func_name) 

643 # data_func = _compile_c_function( 

644 # source, func_name, len(rows), so_path 

645 # ) 

646 # except Exception as e: 

647 # print( 

648 # f"C compilation failed for '{key}' matrix ({e}), falling back to numba." 

649 # ) 

650 # return self._build_matrix_function_numba(key, parallel=parallel) 

651 # 

652 # template = coo_matrix( 

653 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape 

654 # ).tocsr() 

655 # 

656 # def assembled(*td_vals: float) -> csr_matrix: 

657 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float) 

658 # return template 

659 # 

660 # return assembled 

661 # 

662 # def _build_matrix_function( 

663 # self, key: str, strategy: str | None = None 

664 # ) -> Callable: 

665 # """ 

666 # Dispatcher selecting the matrix function build strategy. 

667 # 

668 # Parameters 

669 # ---------- 

670 # key : str 

671 # Either "symbol" or "input" to determine the matrix type. 

672 # strategy : str | None 

673 # One of "lambdify", "parallel", "numba", "c". If None, falls back 

674 # to self.strategy set at __init__. 

675 # 

676 # Returns 

677 # ------- 

678 # Callable : 

679 # Function accepting td symbol values, returning a csr_matrix. 

680 # """ 

681 # s = strategy if strategy is not None else self.strategy 

682 # if s == "lambdify": 

683 # return self._build_matrix_function_lambdify(key, parallel=False) 

684 # if s == "parallel": 

685 # return self._build_matrix_function_lambdify(key, parallel=True) 

686 # if s == "numba": 

687 # return self._build_matrix_function_numba(key, parallel=False) 

688 # if s == "c": 

689 # return self._build_matrix_function_c(key, parallel=False) 

690 # raise ValueError(f"Unknown strategy '{s}'. Choose from: lambdify, parallel, numba, c.") 

691 # 

692 # def _build_matrix( 

693 # self, key: str, strategy: str | None = None 

694 # ) -> csr_matrix: 

695 # """ 

696 # Inputs all values into the desired matrix function. 

697 # 

698 # Parameters 

699 # ---------- 

700 # key : str 

701 # Either "symbol" or "input" to determine the matrix type. 

702 # strategy : str | None 

703 # Overrides self.strategy if provided. 

704 # 

705 # Returns 

706 # ------- 

707 # csr_matrix : 

708 # The desired matrix. 

709 # """ 

710 # attr = f"_{key}_matrix" 

711 # if getattr(self, attr) is None: 

712 # if not self.get_time_dependent_symbols(): 

713 # result = self._build_matrix_direct(key) 

714 # else: 

715 # result = self._build_matrix_function(key, strategy=strategy)( 

716 # *self.get_values() 

717 # ) 

718 # assert self.no_infinity(result), ( 

719 # f"Infinity detected in {key} matrix. System cannot be solved." 

720 # ) 

721 # setattr(self, attr, result) 

722 # return getattr(self, attr) 

723 

724 def _build_matrix(self, key: str) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

725 """ 

726 Inputs all values (can be symbolic values) into the desired matrix function. 

727 

728 Parameters 

729 ---------- 

730 key : str 

731 Either "symbol" or "input" to determine the matrix type. 

732 

733 Returns 

734 ------- 

735 SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray : 

736 The desired matrix. If no time dependent variables are used, it is a scipy sparse matrix, otherwise it's a 

737 sympy SparseMatrix. 

738 """ 

739 attr = f"_{key}_matrix" 

740 if getattr(self, attr) is None: 

741 if self.get_time_dependent_symbols(): 

742 result = self._build_matrix_function(key)(*self.get_values()) 

743 else: 

744 result = self._build_matrix_direct(key) 

745 assert self.no_infinity(result), f"Infinity detected in {key} matrix. System cannot be solved." 

746 setattr(self, attr, result) 

747 return getattr(self, attr) 

748 

749 @property 

750 def system_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

751 return self._get_matrix_symbol("system") 

752 

753 @property 

754 def input_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix: 

755 return self._get_matrix_symbol("input") 

756 

757 @property 

758 def system_matrix_function(self) -> Callable: 

759 return self._build_matrix_function("system") 

760 

761 @property 

762 def input_matrix_function(self) -> Callable: 

763 return self._build_matrix_function("input") 

764 

765 @property 

766 def system_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

767 return self._build_matrix("system") 

768 

769 @property 

770 def input_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray: 

771 return self._build_matrix("input") 

772 

773 @property 

774 def temperature_vector_symbol(self) -> list: 

775 if self._temperature_vector_symbol is None: 

776 result = [node.temperature_symbol for node in self.nodes] 

777 self._temperature_vector_symbol = result 

778 return self._temperature_vector_symbol 

779 

780 @property 

781 def temperature_vector(self) -> np.ndarray: 

782 result = [node.temperature for node in self.nodes] 

783 return np.array(result).flatten() 

784 

785 @property 

786 def input_vector(self) -> np.ndarray: 

787 result = [n for input_item in self.inputs for n in input_item.values] 

788 return np.array(result).flatten() 

789 

790 @property 

791 def input_vector_symbol(self) -> np.ndarray: 

792 if self._input_vector_symbol is None: 

793 result = [] 

794 if self.inputs is not None and self.inputs != []: 

795 result = [n for input_item in self.inputs for n in input_item.symbols] 

796 self._input_vector_symbol = np.array(result).flatten() 

797 return self._input_vector_symbol 

798 

799 # @property 

800 # def seconds_of_day(self): 

801 # """ 

802 # Returns the seconds that have already passed on the day of ``self.start_time``. 

803 # 

804 # Returns 

805 # ------- 

806 # float | int : 

807 # The passed seconds of the day. 

808 # """ 

809 # return (self.settings.start_date - self.settings.start_date.astype("datetime64[D]")) / np.timedelta64(1, "s") 

810 

811 @property 

812 def save_folder_path(self): 

813 if self.settings.save_folder_path is None: 

814 return run_folder 

815 return self.settings.save_folder_path 

816 

817 @property 

818 def pickle_path(self) -> str: 

819 filename = f"{self.hash}.pickle" 

820 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

821 

822 def pickle_path_result(self, t_span, name_add_on=None): 

823 name_add_on = add_leading_underscore(name_add_on) 

824 filename = f"{self.hash}{name_add_on}_{int(t_span[1])}_result.pickle" 

825 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

826 

827 @property 

828 def pickle_path_single_solution(self) -> str: 

829 filename = f"{self.hash}_single_solution.pickle" 

830 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

831 

832 def pickle_path_solution(self, t_span, name_add_on=None) -> str: 

833 name_add_on = add_leading_underscore(name_add_on) 

834 filename = f"{self.hash}{name_add_on}_{int(t_span[0])}_{int(t_span[1])}_solution.pickle" 

835 return os.path.normpath(os.path.join(self.save_folder_path, filename)) 

836 

837 def save_matrices(self): 

838 if self.save_to_pickle: 

839 file_path = self.pickle_path 

840 matrices = [ 

841 self._system_matrix_symbol, 

842 self._input_matrix_symbol, 

843 self._input_vector_symbol, 

844 self._temperature_vector_symbol, 

845 self.forbidden_values, 

846 ] 

847 with open(file_path, "wb") as f: 

848 pickle.dump(matrices, f) 

849 

850 def load_matrices(self): 

851 file_path = self.pickle_path 

852 if os.path.exists(file_path): 

853 with open(file_path, "rb") as f: 

854 matrices = pickle.load(f) 

855 self._system_matrix_symbol = matrices[0] 

856 self._input_matrix_symbol = matrices[1] 

857 self._input_vector_symbol = matrices[2] 

858 self._temperature_vector_symbol = matrices[3] 

859 self.forbidden_values = matrices[4] 

860 print("matrices loaded") 

861 return True 

862 else: 

863 print("No network found nor loaded.") 

864 return False 

865 

866 def save_last_step_solution(self, pickle_path_single_solution: str = None): 

867 """ 

868 Saves the last time step solution. 

869 """ 

870 if pickle_path_single_solution is None: 

871 pickle_path_single_solution = self.pickle_path_single_solution 

872 file_path = pickle_path_single_solution 

873 self.rc_solution.save_last_step(file_path) 

874 

875 def load_initial_values(self, return_bool: bool = False, pickle_path_single_solution: str = None) -> None | bool: 

876 """ 

877 Loads the last time step solution (temperature vector) and sets it as initial values. 

878 """ 

879 if pickle_path_single_solution is None: 

880 pickle_path_single_solution = self.pickle_path_single_solution 

881 file_path = pickle_path_single_solution 

882 if os.path.exists(file_path): 

883 with open(file_path, "rb") as f: 

884 solution_tuple = pickle.load(f) 

885 temp_vector = solution_tuple[0] 

886 input_vector = solution_tuple[1] 

887 for node, temp in zip(self.rc_objects.nodes, temp_vector): 

888 node.initial_temperature = temp 

889 for item, value in zip(self.inputs, input_vector): 

890 item.initial_value = value 

891 if return_bool: 

892 return True 

893 else: 

894 print("No temperature vector found nor loaded.") 

895 if return_bool: 

896 return False 

897 return None 

898 

899 @property 

900 def inhomogeneous_system(self) -> bool: 

901 """ 

902 Returns True if inputs are existing and the network is inhomogeneous. False otherwise. 

903 

904 Returns 

905 ------- 

906 bool : 

907 True if inputs are existing and the network is inhomogeneous. False otherwise. 

908 """ 

909 if self.rc_objects.inputs is None or self.rc_objects.inputs == []: 

910 return False 

911 return True 

912 

913 def determine_max_step(self) -> float: 

914 max_step = 1 

915 if self.rc_objects.mass_flow_nodes is not None or self.rc_objects.mass_flow_nodes != []: 

916 loop_counter = 0 

917 while loop_counter < 1000: 

918 try: 

919 self.check_courant(max_step) # approximated maximum time step used during iterations. 

920 # if no error 

921 break 

922 except HighCourantNumberError: 

923 loop_counter += 1 

924 max_step *= 0.99 

925 if loop_counter >= 1000: 

926 raise HighCourantNumberError 

927 return max_step 

928 

929 def solve_stationary(self, time_step: float | int = 0): 

930 """ 

931 Determines the analytic solution of the network (if it exists). It works for symbolic and numeric matrices. 

932 

933 Parameters 

934 ---------- 

935 time_step : float | int, optional 

936 The time step as which the solution is saved in the rc_solution object. 

937 This parameter has no effect if the network is symbolic. 

938 

939 Returns 

940 ------- 

941 sp.Matrix | MutableDenseMatrix or None: 

942 Stationary symbolic temperature vector of shape (n, 1) (only if result is symbolic, otherwise nothing is 

943 returned and the result is written to the rc_solution object using time_step). 

944 """ 

945 if self.inhomogeneous_system: 

946 result = solve_stationary( 

947 system_matrix=self.system_matrix, 

948 input_matrix=self.input_matrix, 

949 input_vector=self.input_vector, 

950 ) 

951 else: 

952 result = solve_stationary( 

953 system_matrix=self.system_matrix, 

954 ) 

955 

956 if isinstance(result, Basic) and result.free_symbols is not None: 

957 # return symbolic result 

958 return result 

959 else: 

960 # save result in rc_solution 

961 self.rc_solution.add_to_solution([np.array(time_step).reshape(-1,)], [np.array(result).reshape(-1, 1)]) 

962 return None 

963 

964 def solve_network( 

965 self, 

966 t_span: tuple | Any = None, 

967 print_progress=False, 

968 name_add_on: str = "", 

969 check_courant: bool = True, 

970 time_dependent_function: Callable = None, 

971 hook_function: Callable = None, 

972 expected_solution_size_mb=5000, 

973 **kwargs: dict[str, int | Any], 

974 ) -> None: 

975 """ 

976 Solves the network at the given times. 

977 

978 If time_vector is None, one second is calculated. 

979 

980 Parameters 

981 ---------- 

982 t_span : tuple | Any 

983 Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf. 

984 Both t0 and tf must be floats or values interpretable by the float conversion function. 

985 print_progress : bool, optional 

986 If True, some print-outs are made during solving to get the time step that is currently simulated. 

987 name_add_on : str, optional 

988 Optional add-on to the name of in-between saves that is placed after the hash value (separated by "_"). 

989 Example save name: 

990 42395d3d9f07f06ce9c9cb609b406aa54fc2dc5e8c4d9cc75987d9a02541ad2f_nameAddOn_zw_000001080_h.pickle 

991 check_courant : bool, optional 

992 If True, self.check_courant(0.4) is run before simulating. 

993 time_dependent_function : Callable, optional 

994 A function that calculates all time dependent variables within the time step and returns them in the same order as 

995 self.get_time_dependent_symbols(). 

996 It gets parameters like this:\n 

997 ``time_dependent_function(time, temperature_vector)`` \n 

998 or\n 

999 ``time_dependent_function(time, temperature_vector, input_vector)``\\.\n 

1000 This function is required if time dependent symbols exist. 

1001 It must return an iterable (e.g. list). 

1002 To not run into Errors just use ``*args``\\, ``**kwargs`` at the end in case more values are passed then 

1003 needed. 

1004 hook_function : Callable, optional 

1005 A function that is run before the network is solved using solve_ivp. 

1006 Can be used to catch the time before the solving actually starts (e.g. used in `PerformanceTest`\\). 

1007 expected_solution_size_mb : int | float, optional 

1008 The expected solution size in Megabytes. Is used for RAM-Management in ``SystemHandler.solve()``\\. 

1009 The solution size depends on the size of the RC network and number of saved time steps. 

1010 kwargs : dict[str: int | Any], optional 

1011 Passed to SystemHandler. 

1012 """ 

1013 if self.get_time_dependent_symbols(): 

1014 assert time_dependent_function 

1015 

1016 continued_simulation = False 

1017 if self.load_solution or self.continue_canceled: 

1018 success = self.rc_solution.load_solution( 

1019 self.pickle_path_solution(t_span, name_add_on), last_time_step=t_span[-1] 

1020 ) 

1021 if not success: 

1022 success = self.rc_solution.load_solution( 

1023 self.pickle_path_result(t_span, name_add_on), last_time_step=t_span[-1] 

1024 ) 

1025 if success or (not isinstance(success, bool) and success == 0): # ==0 because 0 could be the last time 

1026 # step (will never be the case.... -.-) 

1027 if not isinstance(success, bool): 

1028 if not self.continue_canceled: 

1029 self.rc_solution.delete_solution_except_first() 

1030 print(f"Partial solution detected but not used because continue_canceled is False.") 

1031 else: 

1032 print( 

1033 f"Simulation is started at loaded time step {success} on {success / t_span[-1] * 100:.2f} %." 

1034 ) 

1035 t_span = (success, t_span[-1]) 

1036 continued_simulation = True 

1037 else: 

1038 print("Solution was loaded from file.") 

1039 return None 

1040 

1041 if check_courant: 

1042 new_max_step = self.determine_max_step() 

1043 if self.settings.solve_settings.max_step > new_max_step: 

1044 print( 

1045 f"Because of Courant Number: New maximum step is set to {new_max_step} (previous:" 

1046 f" {self.settings.solve_settings.max_step})" 

1047 ) 

1048 self.settings.solve_settings.max_step = new_max_step 

1049 

1050 temperature_vector = np.array(self.temperature_vector).flatten() 

1051 system_matrix = self.system_matrix 

1052 time_symbols = self.get_time_dependent_symbols() 

1053 if time_symbols: 

1054 system_matrix = SparseSymbolicEvaluator(system_matrix, time_symbols) 

1055 

1056 system_handler_kwargs = { 

1057 "system_matrix": system_matrix, 

1058 "rc_solution": self.rc_solution, 

1059 "settings": self.settings, 

1060 "print_progress": print_progress, 

1061 "print_points": np.linspace(t_span[0], t_span[-1], 20 + 1), 

1062 "batch_end": t_span[-1], 

1063 "time_dependent_function": time_dependent_function, 

1064 } 

1065 

1066 # create the time_steps to solve the results (every X seconds a value) 

1067 t_eval = np.linspace( 

1068 t_span[0], t_span[1], max(1, int((t_span[1] - t_span[0]) / self.settings.save_all_x_seconds) + 1) 

1069 ) 

1070 

1071 if self.inhomogeneous_system: 

1072 input_matrix = self.input_matrix 

1073 if time_symbols: 

1074 input_matrix = SparseSymbolicEvaluator(input_matrix, time_symbols) 

1075 input_vector = self.input_vector 

1076 

1077 # Each Input can be called as a function to calculate new boundary values during simulation: 

1078 functions_list = self.rc_objects.inputs 

1079 # Also each Input has some functions that can be called independently of the object that return values for 

1080 # the calculation. 

1081 kwargs_functions = {} 

1082 for item in self.rc_objects.inputs: 

1083 kwargs_functions.update(item.get_kwargs_functions()) # Values should be overwritten so that the same 

1084 # values are calculated only once 

1085 

1086 system_handler_kwargs.update( 

1087 { 

1088 "input_matrix": input_matrix, 

1089 "input_vector": input_vector, 

1090 "functions_list": functions_list, 

1091 "kwargs_functions": kwargs_functions, 

1092 "t_eval": t_eval, 

1093 "first_time": t_span[0], 

1094 } 

1095 ) 

1096 

1097 # only update the keys that are in the predefined dictionary (it would also work if everything is passed anyway) 

1098 system_handler_kwargs.update({k: kwargs[k] for k in system_handler_kwargs if k in kwargs}) 

1099 

1100 system_handler = self.system_handler_type(**system_handler_kwargs) 

1101 save_prefix = f"{self.hash}" 

1102 if name_add_on: 

1103 save_prefix += f"_{name_add_on.removeprefix('_').removesuffix('_')}" 

1104 solve_handler = SolveIVPHandler( 

1105 system_handler=system_handler, 

1106 save_prefix=save_prefix, 

1107 solve_settings=self.settings.solve_settings, 

1108 ) 

1109 if hook_function is not None: 

1110 hook_function() 

1111 solve_handler.solve( 

1112 t_span=t_span, 

1113 y0=temperature_vector, 

1114 t_eval=t_eval, 

1115 continued_simulation=continued_simulation, 

1116 expected_solution_size_mb=expected_solution_size_mb, 

1117 ) 

1118 return None 

1119 

1120 @property 

1121 def system_handler_type(self): 

1122 """ 

1123 Returns the SystemHandler type depending on which system (in/homogeneous) is needed. 

1124 

1125 Returns 

1126 ------- 

1127 type : 

1128 The system handler type. 

1129 """ 

1130 if self.inhomogeneous_system: 

1131 return InhomogeneousSystemHandler 

1132 return HomogeneousSystemHandler 

1133 

1134 @property 

1135 def all_objects(self) -> list: 

1136 return [*(self.rc_objects.resistors or []), *(self.rc_objects.nodes or []), *(self.rc_objects.inputs or [])] 

1137 

1138 @property 

1139 def objects_with_identical_symbols(self) -> list: 

1140 """ 

1141 Get all objects with symbols that are relevant for the system matrix. 

1142 

1143 You cannot just use every object, because some (e.g. Resistors) can be summarized in local groups with one 

1144 symbol representing the whole group, like the equivalent resistance of multiple connected Resistors. 

1145 

1146 No `InternalHeatSource` s are returned, because their symbol is already represented in the corresponding Node. 

1147 

1148 Returns 

1149 ------- 

1150 list : 

1151 All network objects without the ones that are already taken into account by the symbols of another. 

1152 The list has the same order every time it is created. 

1153 """ 

1154 

1155 def filter_resistors(resistor_list): 

1156 # Create set of all resistors for O(1) lookups 

1157 all_resistors = {obj for obj in resistor_list if isinstance(obj, Resistor)} 

1158 to_exclude = set() 

1159 

1160 # Collect all resistors to exclude in order of appearance 

1161 for obj in resistor_list: 

1162 if isinstance(obj, Resistor) and obj not in to_exclude: 

1163 inbetween = set(obj.all_resistors_inbetween) - {obj} 

1164 to_exclude.update(inbetween & all_resistors) 

1165 

1166 # Filter maintaining original order 

1167 return [obj for obj in resistor_list if not obj in to_exclude] 

1168 

1169 result = [rc_obj for rc_obj in self.all_objects if not isinstance(rc_obj, InternalHeatSource)] 

1170 return filter_resistors(result) 

1171 

1172 def get_symbols(self): 

1173 """ 

1174 Returns a list with all symbols, except time dependent symbols. 

1175 

1176 Returns 

1177 ------- 

1178 list : 

1179 All symbols, except time dependent symbols. 

1180 """ 

1181 all_symbols = [symb for rc_object in self.objects_with_identical_symbols for symb in rc_object.symbols] 

1182 return all_symbols 

1183 

1184 def get_values(self): 

1185 """ 

1186 Returns a list with all values, except of time dependent symbols. 

1187 

1188 Returns 

1189 ------- 

1190 list : 

1191 All values, except of time dependent symbols. 

1192 """ 

1193 all_values = [ 

1194 val if isinstance(val, Basic) else np.float64(val) 

1195 for rc_object in self.objects_with_identical_symbols 

1196 for val in rc_object.values 

1197 ] 

1198 return all_values 

1199 

1200 def get_time_dependent_symbols(self) -> list: 

1201 """ 

1202 A list with all time dependent symbols but each of it is only added once. 

1203 

1204 Returns 

1205 ------- 

1206 list : 

1207 The time dependent symbols. 

1208 """ 

1209 symbols = [] 

1210 seen = set() 

1211 for rc_object in self.objects_with_identical_symbols: 

1212 for symb in rc_object.time_dependent_symbols: 

1213 if symb not in seen: 

1214 seen.add(symb) 

1215 symbols.append(symb) 

1216 return symbols 

1217 

1218 def get_symbols_and_values(self) -> tuple[list, list]: 

1219 """ 

1220 Return two lists of first: all symbols and second: all associated values of all rc objects. 

1221 

1222 Returns 

1223 ------- 

1224 tuple[list, list] : 

1225 

1226 """ 

1227 return self.get_symbols(), self.get_values() 

1228 

1229 def make_system_matrices(self): 

1230 """ 

1231 Creates the Jacobian matrices of the RC Network. 

1232 

1233 If self.load_from_pickle the whole network will be loaded from pickle file if it exists to prevent heavy 

1234 calculations. 

1235 

1236 Returns 

1237 ------- 

1238 sympy expression : 

1239 The system matrix as sympy expression. 

1240 """ 

1241 success = False 

1242 if self.load_from_pickle: 

1243 try: 

1244 success = self.load_matrices() 

1245 except Exception: 

1246 pass 

1247 if not success: 

1248 # fill system matrix 

1249 terms = [] 

1250 involved_symbols: list[set] = [] 

1251 temperature_symbols: list[set] = [] 

1252 variables = [] 

1253 for node in self.nodes: 

1254 # works only for one term 

1255 term, t_symbols, all_symbols = node.temperature_derivative_term() 

1256 terms.append(term) 

1257 involved_symbols.append(all_symbols) 

1258 temperature_symbols.append(t_symbols) 

1259 variables.append(node.temperature_symbol) 

1260 

1261 self._system_matrix_symbol = build_jacobian( 

1262 terms, variables, temperature_symbols, num_cores=self.num_cores_jacobian 

1263 ) 

1264 print("system matrix created") 

1265 

1266 self.save_matrices() 

1267 

1268 input_variables = list(self.input_vector_symbol) 

1269 self._input_matrix_symbol = build_jacobian( 

1270 terms, input_variables, involved_symbols, num_cores=self.num_cores_jacobian 

1271 ) 

1272 print("input matrix created") 

1273 

1274 self.save_matrices() 

1275 

1276 def check_courant(self, time_step: float): 

1277 """ 

1278 Calculates the Courant number for the whole network and raises an error if its larger than 1. 

1279 

1280 Parameters 

1281 ---------- 

1282 time_step : float 

1283 The maximum time step in seconds. 

1284 

1285 Raises 

1286 ------ 

1287 HighCourantNumberError : 

1288 If the courant number is greater than 1 the network will calculate shit. 

1289 

1290 """ 

1291 # TODO: Do not use the courant number but return a critical maximum time_step to easy raise an error during 

1292 # iterations. So: no iteration, but calculate the maximum time_step and return this so that it can be set. 

1293 mass_flow_nodes = self.rc_objects.mass_flow_nodes 

1294 mass_flow_node: MassFlowNode 

1295 for mass_flow_node in mass_flow_nodes: 

1296 if mass_flow_node.courant_number(time_step) > 1: 

1297 raise HighCourantNumberError 

1298 

1299 def matrix_to_latex_diag(self): 

1300 matrix = self.system_matrix_symbol 

1301 diag_elements = matrix.diagonal() 

1302 matrix_no_diag = matrix - diag(*diag_elements) 

1303 

1304 matrix_latex = latex(matrix_no_diag) 

1305 diag_latex = latex(Matrix(diag_elements).reshape(len(diag_elements), 1)) 

1306 

1307 return f"{matrix_latex} + \\mathrm{{diag}}\\left({diag_latex}\\right)"