Coverage for pyrc \ core \ network.py: 70%
416 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:20 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:20 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8from __future__ import annotations
9import os
10import pickle
11from abc import abstractmethod
12from copy import copy
13from typing import Any, TYPE_CHECKING, Callable
15import numpy as np
16from sympy import lambdify, latex, SparseMatrix, Matrix, diag, Symbol, Basic, ImmutableSparseMatrix, MutableSparseMatrix
17import sympy as sym
18import hashlib
19import networkx as nx
20from scipy.sparse import coo_matrix
22from pyrc.core.solver.stationary import solve_stationary
23from pyrc.paths import run_folder
24from pyrc.core.components.resistor import Resistor
25from pyrc.core.inputs import InternalHeatSource
26from pyrc.core.settings import initial_settings, Settings
27from pyrc.core.solver.symbolic import SparseSymbolicEvaluator
28from pyrc.core.solver.handler import InhomogeneousSystemHandler, SolveIVPHandler, HomogeneousSystemHandler
29from pyrc.tools.errors import HighCourantNumberError
30from pyrc.tools.functions import add_leading_underscore
32from pyrc.tools.science import build_jacobian
33from pyrc.core.components.templates import RCObjects, RCSolution, EquationItem, initial_rc_objects, Cell
34from pyrc.core.components.templates import solution_object
35from pyrc.core.components.capacitor import Capacitor
37if TYPE_CHECKING:
38 from pyrc.core.nodes import MassFlowNode
39 from pyrc.core.components.templates import EquationItemInput
40 from scipy.sparse import spmatrix, sparray
43class RCNetwork:
44 def __init__(
45 self,
46 save_to_pickle: bool = True,
47 load_from_pickle: bool = True,
48 load_solution: bool = True,
49 num_cores_jacobian: int = 1, # TODO: Parallelization only works on linux currently (because it forks instead
50 # of spawning it). Problem is: Sympy symbols are not picklable.
51 settings: Settings = initial_settings,
52 rc_objects: RCObjects = initial_rc_objects,
53 rc_solution: RCSolution = solution_object,
54 # strategy: str = "lambdify",
55 continue_canceled: bool = True,
56 ):
57 """
59 Parameters
60 ----------
61 save_to_pickle
62 load_from_pickle
63 load_solution
64 num_cores_jacobian
65 settings
66 rc_objects
67 rc_solution
68 continue_canceled : bool, optional
69 If True, continue canceled simulations.
70 Overwrites load_solution
71 """
72 self.rc_objects: RCObjects = rc_objects
73 self.settings = settings
74 self._system_matrix_symbol: MutableSparseMatrix = None
75 self._input_matrix_symbol: MutableSparseMatrix = None
76 self._temperature_vector_symbol = None
77 self._input_vector_symbol = None
78 self._input_matrix = None
79 self._system_matrix = None
80 self.__hash = None
81 self.rc_solution: RCSolution = rc_solution
82 # link rc objects to rc solution
83 self.rc_solution.rc_objects = rc_objects
84 # self.strategy = strategy
86 # For each time dependent symbol the following values are not allowed because they would make an entry in the
87 # system matrix be infinity
88 self.forbidden_values: dict[Symbol, set] = {}
90 self.save_to_pickle: bool = save_to_pickle
91 self.load_from_pickle: bool = load_from_pickle
92 self.load_solution: bool = load_solution
93 self.continue_canceled: bool = continue_canceled
95 if num_cores_jacobian is None:
96 num_cores_jacobian = os.cpu_count()
97 self.num_cores_jacobian = min(max(num_cores_jacobian, 1), os.cpu_count())
99 self.groups: dict[str, Capacitor] = {}
101 def __getattr__(self, item):
102 if getattr(self.rc_objects, item, None):
103 return self.rc_objects.__getattribute__(item)
104 return AttributeError
106 def create_network(self, *args, **kwargs):
107 EquationItem.reset_counter()
108 Resistor.reset_counter()
109 self.rc_objects.wipe_all()
110 self._create_network(*args, **kwargs)
111 assert self.nodes is not None
113 @abstractmethod
114 def _create_network(self, *args, **kwargs):
115 pass
117 def copy_dict(self):
118 return {
119 "save_to_pickle": self.save_to_pickle,
120 "load_from_pickle": self.load_from_pickle,
121 "load_solution": self.load_solution,
122 "num_cores_jacobian": self.num_cores_jacobian,
123 "settings": copy(self.settings),
124 }
126 def __copy__(self):
127 return RCNetwork(**self.copy_dict())
129 @property
130 def nodes(self):
131 return self.rc_objects.nodes
133 @property
134 def hash(self):
135 """
136 Compute a deterministic hash for the :class:`RCNetwork` where node identity (class name + obj.id) and topology matter.
138 For :class:`Capacitor` instances, an optional internal_heat_source (identified by its id) is encoded as a node attribute.
140 For :class:`Cell` subclass instances, position and delta arrays are encoded.
142 Returns
143 -------
144 str :
145 SHA-256 hex digest of the canonical edge and node representation.
146 """
147 if self.__hash is None:
149 def node_key(_obj) -> str:
150 return f"{type(_obj).__name__}:{_obj.id}"
152 def array_str(arr: np.ndarray) -> str:
153 return ",".join(f"{v:.17g}" for v in arr.ravel())
155 graph = nx.Graph()
156 for obj in self.rc_objects.all:
157 attrs = {}
158 if isinstance(obj, Capacitor):
159 attrs["heat_source_id"] = (
160 obj.internal_heat_source.id if obj.internal_heat_source is not None else None
161 )
162 if isinstance(obj, Cell):
163 attrs["position"] = array_str(obj.position)
164 attrs["delta"] = array_str(obj.delta)
165 graph.add_node(node_key(obj), **attrs)
167 # loop over all linked/connected network objects
168 for obj in self.rc_objects.all:
169 for neighbour in obj.neighbours:
170 graph.add_edge(node_key(obj), node_key(neighbour))
172 node_str = "|".join(
173 f"{nid}#{data}"
174 for nid, data in sorted((nid, str(sorted(data.items()))) for nid, data in graph.nodes(data=True))
175 )
176 edge_str = "|".join(sorted(f"{min(u, v)}-{max(u, v)}" for u, v in graph.edges()))
177 canonical = f"nodes:{node_str};edges:{edge_str}"
178 self.__hash = hashlib.sha256(canonical.encode()).hexdigest()
179 return self.__hash
181 @property
182 def time_steps(self) -> list:
183 """
184 Returns a list with all time steps.
186 Returns
187 -------
188 list :
189 The time steps.
190 """
191 return self.rc_solution.time_steps
193 def reset_properties(self):
194 self._temperature_vector_symbol = None
196 @property
197 def inputs(self) -> list[EquationItemInput]:
198 return self.rc_objects.inputs
200 def change_static_dynamic(self, calculate_static: bool | str):
201 if isinstance(calculate_static, str):
202 match calculate_static.lower():
203 case "static" | "statisch" | "s":
204 calculate_static = True
205 case _:
206 calculate_static = False
207 self.settings.calculate_static = calculate_static
209 def get_node_by_id(self, _id: str | int):
210 """
211 Returns the node with the given id.
213 Parameters
214 ----------
215 _id : str | int
216 The id of the node.
218 Returns
219 -------
220 TemperatureNode | InternalHeatSource :
221 """
222 if isinstance(_id, str):
223 _id = int(_id)
224 return next((x for x in self.rc_objects.all_equation_objects if x.id == _id), None)
226 def no_infinity(self, sp_matrix: SparseMatrix | spmatrix | sparray) -> bool:
227 """
228 Checks the given sparse matrix for infinity entries and symbolic singularities.
230 Notes
231 -----
232 Saves a dict, which maps each free sympy symbol to a set of values at which at least one symbolic entry
233 diverges. Empty if no symbolic entries exist.
235 Parameters
236 ----------
237 sp_matrix : SparseMatrix | spmatrix | sparray
238 A sparse matrix/vector to check, may contain numeric values or sympy
239 expressions.
241 Returns
242 -------
243 tuple[bool, dict] :
244 bool : True if no numeric infinity was found.
245 """
246 sp_matrix: SparseMatrix | spmatrix | sparray
247 if isinstance(sp_matrix, SparseMatrix) or isinstance(sp_matrix, ImmutableSparseMatrix):
248 entries = sp_matrix.todok().values()
249 else:
250 sp_matrix: spmatrix | sparray
251 entries = sp_matrix.data
253 for entry in entries:
254 if isinstance(entry, Basic):
255 if not entry.free_symbols:
256 if entry in (sym.oo, -sym.oo, sym.zoo, sym.nan):
257 return False
258 else:
259 _, denominator = sym.fraction(sym.cancel(entry))
260 if denominator != sym.Integer(1):
261 for s in denominator.free_symbols:
262 solution = sym.solve(denominator, s)
263 self.forbidden_values.setdefault(s, set()).update(solution)
264 else:
265 if np.isinf(entry):
266 return False
268 return True
270 def _get_matrix_symbol(self, key: str) -> SparseMatrix | ImmutableSparseMatrix:
271 """
272 Returns the desired matrix in symbolic notation.
274 Parameters
275 ----------
276 key : str
277 Either "symbol" or "input" to determine the matrix type.
279 Returns
280 -------
281 SparseMatrix | ImmutableSparseMatrix :
282 The (sympy) symbolic notation of the matrix.
283 """
284 attr = f"_{key}_matrix_symbol"
285 if getattr(self, attr) is None:
286 self.make_system_matrices()
287 return getattr(self, attr)
289 def _build_matrix_function(self, key: str) -> Callable:
290 """
291 Builds the desired matrix lambda function using the symbols matrix, respectively.
293 Parameters
294 ----------
295 key : str
296 Either "symbol" or "input" to determine the matrix type.
298 Returns
299 -------
300 Callable :
301 The function of the matrix.
302 """
303 matrix_symbol = self._get_matrix_symbol(key)
304 all_symbols = self.get_symbols()
305 if self.get_time_dependent_symbols():
306 ordered_symbols = [
307 next((s for s in matrix_symbol.free_symbols if s.name == symbol.name), symbol) for symbol in all_symbols
308 ]
309 return lambdify(ordered_symbols, matrix_symbol, "sympy")
310 return lambdify(all_symbols, matrix_symbol, "scipy")
312 def _build_matrix_direct(self, key: str) -> spmatrix:
313 """
314 Bypasses lambdify by substituting all symbol values via name-based xreplace.
316 Parameters
317 ----------
318 key : str
319 Either "symbol" or "input" to determine the matrix type.
321 Notes
322 -----
323 The replace must be done using the name of the symbols and not the objects itself because they might be loaded
324 from disk and might not match, but the names do.
326 Returns
327 -------
328 scipy.spmatrix :
329 A scipy sparse matrix with all values substituted.
330 """
331 matrix_symbol = self._get_matrix_symbol(key)
332 name_to_value = {s.name: v for s, v in zip(self.get_symbols(), self.get_values())}
333 free = matrix_symbol.free_symbols
334 substitution_map = {s: name_to_value[s.name] for s in free if s.name in name_to_value}
336 dok = matrix_symbol.todok()
337 rows, cols, data = [], [], []
338 for (i, j), expr in dok.items():
339 rows.append(i)
340 cols.append(j)
341 data.append(float(expr.xreplace(substitution_map)))
343 return coo_matrix((data, (rows, cols)), shape=matrix_symbol.shape).tocsr()
345 # import warnings
346 # import logging
347 # import pickle
348 # from concurrent.futures import ProcessPoolExecutor
349 # from pathlib import Path
350 # from typing import Callable
351 #
352 # import numba
353 # import numpy as np
354 # from scipy.sparse import coo_matrix, csr_matrix
355 # from sympy import lambdify
356 # from sympy.printing import ccode
357 #
358 # def _xreplace_element(args: tuple) -> object:
359 # """Apply xreplace to a single expression."""
360 # expr, static_map = args
361 # return expr.xreplace(static_map)
362 #
363 # def _build_c_source(reduced_exprs: list, td_ordered: list, func_name: str) -> str:
364 # """
365 # Generates a C source file with a single function iterating over all nonzero
366 # expressions.
367 #
368 # Parameters
369 # ----------
370 # reduced_exprs : list
371 # Sympy expressions with static symbols already substituted.
372 # td_ordered : list
373 # Ordered time-dependent sympy symbols.
374 # func_name : str
375 # Name of the generated C function.
376 #
377 # Returns
378 # -------
379 # str :
380 # C source code as a string.
381 # """
382 # args = ", ".join(f"double {s.name}" for s in td_ordered)
383 # lines = [
384 # "#include <stddef.h>",
385 # f"void {func_name}({args}, double *out, size_t n) {{",
386 # ]
387 # for i, expr in enumerate(reduced_exprs):
388 # lines.append(f" out[{i}] = {ccode(expr)};")
389 # lines.append("}")
390 # return "\n".join(lines)
391 #
392 # def _compile_c_function(
393 # source: str,
394 # func_name: str,
395 # n_out: int,
396 # cache_path: Path,
397 # ) -> Callable:
398 # """
399 # Compiles C source to a shared library and returns a callable via ctypes.
400 #
401 # Parameters
402 # ----------
403 # source : str
404 # C source code string.
405 # func_name : str
406 # Name of the C function to load.
407 # n_out : int
408 # Number of output values (nnz).
409 # cache_path : Path
410 # Path to write the compiled .so / .dll.
411 #
412 # Returns
413 # -------
414 # Callable :
415 # Python callable wrapping the compiled C function.
416 # """
417 # import ctypes
418 # import subprocess
419 # import tempfile
420 #
421 # src_path = cache_path.with_suffix(".c")
422 # src_path.write_text(source)
423 #
424 # result = subprocess.run(
425 # ["gcc", "-O3", "-shared", "-fPIC", "-o", str(cache_path), str(src_path)],
426 # capture_output=True,
427 # text=True,
428 # )
429 # if result.returncode != 0:
430 # raise RuntimeError(result.stderr)
431 #
432 # lib = ctypes.CDLL(str(cache_path))
433 # fn = getattr(lib, func_name)
434 # fn.restype = None
435 #
436 # out_type = ctypes.c_double * n_out
437 #
438 # def c_callable(*td_vals: float) -> np.ndarray:
439 # out = out_type()
440 # fn(*[ctypes.c_double(v) for v in td_vals], out, ctypes.c_size_t(n_out))
441 # return np.frombuffer(out, dtype=np.float64).copy()
442 #
443 # return c_callable
445 # def _prepare_matrix_ingredients(
446 # self, key: str, parallel: bool
447 # ) -> tuple[list, list, np.ndarray, np.ndarray, tuple, list]:
448 # """
449 # Extracts and reduces nonzero expressions, substituting all static symbols.
450 #
451 # Parameters
452 # ----------
453 # key : str
454 # Either "symbol" or "input" to determine the matrix type.
455 # parallel : bool
456 # If True, parallelizes static substitution via ProcessPoolExecutor.
457 #
458 # Returns
459 # -------
460 # tuple :
461 # td_ordered, reduced_exprs, rows, cols, shape, raw exprs (unreduced)
462 # """
463 # matrix_symbol = self._get_matrix_symbol(key)
464 # all_symbols = self.get_symbols()
465 # all_values = self.get_values()
466 # td_symbols = self.get_time_dependent_symbols()
467 # td_names = {s.name for s in td_symbols}
468 #
469 # static_map = {
470 # s_mat: val
471 # for sym, val in zip(all_symbols, all_values)
472 # if sym.name not in td_names
473 # for s_mat in matrix_symbol.free_symbols
474 # if s_mat.name == sym.name
475 # }
476 #
477 # td_ordered = [
478 # next(s for s in matrix_symbol.free_symbols if s.name == td.name)
479 # for td in td_symbols
480 # if any(s.name == td.name for s in matrix_symbol.free_symbols)
481 # ]
482 #
483 # dok = matrix_symbol.todok()
484 # shape = matrix_symbol.shape
485 # keys, exprs = zip(*dok.items()) if dok else ([], [])
486 # rows = np.array([k[0] for k in keys], dtype=np.int32)
487 # cols = np.array([k[1] for k in keys], dtype=np.int32)
488 #
489 # TODO: do not do it like that, it is worse than the solution that already exists.
490 # if parallel:
491 # tasks = [(expr, static_map) for expr in exprs]
492 # with ProcessPoolExecutor() as executor:
493 # reduced_exprs = list(executor.map(_xreplace_element, tasks))
494 # else:
495 # reduced_exprs = [expr.xreplace(static_map) for expr in exprs]
496 #
497 # return td_ordered, reduced_exprs, rows, cols, shape
498 #
499 # TODO: This is worse than the current solution, but the other version (numby/c...) should be implemented to speed
500 # things up. But attention: Currently, it is just vibe coded...
501 # def _build_matrix_function_lambdify(
502 # self, key: str, parallel: bool = False
503 # ) -> Callable:
504 # """
505 # Builds the matrix function using lambdify with numpy backend.
506 #
507 # Parameters
508 # ----------
509 # key : str
510 # Either "symbol" or "input" to determine the matrix type.
511 # parallel : bool
512 # If True, parallelizes static symbol substitution.
513 #
514 # Returns
515 # -------
516 # Callable :
517 # Function accepting td symbol values, returning a csr_matrix.
518 # """
519 # td_ordered, reduced_exprs, rows, cols, shape = (
520 # self._prepare_matrix_ingredients(key, parallel)
521 # )
522 #
523 # if not len(rows):
524 # empty = csr_matrix(shape, dtype=float)
525 #
526 # def assembled(*_) -> csr_matrix:
527 # return empty
528 #
529 # return assembled
530 #
531 # data_func = lambdify(td_ordered, reduced_exprs, "numpy")
532 # template = coo_matrix(
533 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
534 # ).tocsr()
535 #
536 # def assembled(*td_vals: float) -> csr_matrix:
537 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
538 # return template
539 #
540 # return assembled
541 #
542 # def _build_matrix_function_numba(self, key: str, parallel: bool = False) -> Callable:
543 # """
544 # Builds the matrix function using a numba-JIT compiled callable, with
545 # caching to self.save_folder_path keyed by self.hash.
546 #
547 # Parameters
548 # ----------
549 # key : str
550 # Either "symbol" or "input" to determine the matrix type.
551 # parallel : bool
552 # If True, parallelizes static symbol substitution.
553 #
554 # Returns
555 # -------
556 # Callable :
557 # Function accepting td symbol values, returning a csr_matrix.
558 # """
559 # td_ordered, reduced_exprs, rows, cols, shape = (
560 # self._prepare_matrix_ingredients(key, parallel)
561 # )
562 #
563 # if not len(rows):
564 # empty = csr_matrix(shape, dtype=float)
565 #
566 # def assembled(*_) -> csr_matrix:
567 # return empty
568 #
569 # return assembled
570 #
571 # cache_path = Path(self.save_folder_path) / f"{self.hash}_{key}_numba.pkl"
572 #
573 # if cache_path.exists():
574 # with open(cache_path, "rb") as f:
575 # data_func = pickle.load(f)
576 # else:
577 # data_func = lambdify(td_ordered, reduced_exprs, "numpy")
578 # data_func_nb = numba.njit(data_func, cache=True)
579 # with open(cache_path, "wb") as f:
580 # pickle.dump(data_func_nb, f)
581 # data_func = data_func_nb
582 #
583 # template = coo_matrix(
584 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
585 # ).tocsr()
586 #
587 # def assembled(*td_vals: float) -> csr_matrix:
588 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
589 # return template
590 #
591 # return assembled
592 #
593 # def _build_matrix_function_c(self, key: str, parallel: bool = False) -> Callable:
594 # """
595 # Builds the matrix function using a compiled C shared library via ctypes,
596 # falling back to numba if compilation fails.
597 #
598 # Parameters
599 # ----------
600 # key : str
601 # Either "symbol" or "input" to determine the matrix type.
602 # parallel : bool
603 # If True, parallelizes static symbol substitution.
604 #
605 # Returns
606 # -------
607 # Callable :
608 # Function accepting td symbol values, returning a csr_matrix.
609 # """
610 # td_ordered, reduced_exprs, rows, cols, shape = (
611 # self._prepare_matrix_ingredients(key, parallel)
612 # )
613 #
614 # if not len(rows):
615 # empty = csr_matrix(shape, dtype=float)
616 #
617 # def assembled(*_) -> csr_matrix:
618 # return empty
619 #
620 # return assembled
621 #
622 # func_name = f"matrix_func_{self.hash}_{key}"
623 # so_path = Path(self.save_folder_path) / f"{func_name}.so"
624 #
625 # try:
626 # if so_path.exists():
627 # import ctypes
628 # lib = ctypes.CDLL(str(so_path))
629 # fn = getattr(lib, func_name)
630 # n_out = len(rows)
631 # out_type = ctypes.c_double * n_out
632 #
633 # def data_func(*td_vals: float) -> np.ndarray:
634 # out = out_type()
635 # fn(
636 # *[ctypes.c_double(v) for v in td_vals],
637 # out,
638 # ctypes.c_size_t(n_out),
639 # )
640 # return np.frombuffer(out, dtype=np.float64).copy()
641 # else:
642 # source = _build_c_source(reduced_exprs, td_ordered, func_name)
643 # data_func = _compile_c_function(
644 # source, func_name, len(rows), so_path
645 # )
646 # except Exception as e:
647 # print(
648 # f"C compilation failed for '{key}' matrix ({e}), falling back to numba."
649 # )
650 # return self._build_matrix_function_numba(key, parallel=parallel)
651 #
652 # template = coo_matrix(
653 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
654 # ).tocsr()
655 #
656 # def assembled(*td_vals: float) -> csr_matrix:
657 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
658 # return template
659 #
660 # return assembled
661 #
662 # def _build_matrix_function(
663 # self, key: str, strategy: str | None = None
664 # ) -> Callable:
665 # """
666 # Dispatcher selecting the matrix function build strategy.
667 #
668 # Parameters
669 # ----------
670 # key : str
671 # Either "symbol" or "input" to determine the matrix type.
672 # strategy : str | None
673 # One of "lambdify", "parallel", "numba", "c". If None, falls back
674 # to self.strategy set at __init__.
675 #
676 # Returns
677 # -------
678 # Callable :
679 # Function accepting td symbol values, returning a csr_matrix.
680 # """
681 # s = strategy if strategy is not None else self.strategy
682 # if s == "lambdify":
683 # return self._build_matrix_function_lambdify(key, parallel=False)
684 # if s == "parallel":
685 # return self._build_matrix_function_lambdify(key, parallel=True)
686 # if s == "numba":
687 # return self._build_matrix_function_numba(key, parallel=False)
688 # if s == "c":
689 # return self._build_matrix_function_c(key, parallel=False)
690 # raise ValueError(f"Unknown strategy '{s}'. Choose from: lambdify, parallel, numba, c.")
691 #
692 # def _build_matrix(
693 # self, key: str, strategy: str | None = None
694 # ) -> csr_matrix:
695 # """
696 # Inputs all values into the desired matrix function.
697 #
698 # Parameters
699 # ----------
700 # key : str
701 # Either "symbol" or "input" to determine the matrix type.
702 # strategy : str | None
703 # Overrides self.strategy if provided.
704 #
705 # Returns
706 # -------
707 # csr_matrix :
708 # The desired matrix.
709 # """
710 # attr = f"_{key}_matrix"
711 # if getattr(self, attr) is None:
712 # if not self.get_time_dependent_symbols():
713 # result = self._build_matrix_direct(key)
714 # else:
715 # result = self._build_matrix_function(key, strategy=strategy)(
716 # *self.get_values()
717 # )
718 # assert self.no_infinity(result), (
719 # f"Infinity detected in {key} matrix. System cannot be solved."
720 # )
721 # setattr(self, attr, result)
722 # return getattr(self, attr)
724 def _build_matrix(self, key: str) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
725 """
726 Inputs all values (can be symbolic values) into the desired matrix function.
728 Parameters
729 ----------
730 key : str
731 Either "symbol" or "input" to determine the matrix type.
733 Returns
734 -------
735 SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray :
736 The desired matrix. If no time dependent variables are used, it is a scipy sparse matrix, otherwise it's a
737 sympy SparseMatrix.
738 """
739 attr = f"_{key}_matrix"
740 if getattr(self, attr) is None:
741 if self.get_time_dependent_symbols():
742 result = self._build_matrix_function(key)(*self.get_values())
743 else:
744 result = self._build_matrix_direct(key)
745 assert self.no_infinity(result), f"Infinity detected in {key} matrix. System cannot be solved."
746 setattr(self, attr, result)
747 return getattr(self, attr)
749 @property
750 def system_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix:
751 return self._get_matrix_symbol("system")
753 @property
754 def input_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix:
755 return self._get_matrix_symbol("input")
757 @property
758 def system_matrix_function(self) -> Callable:
759 return self._build_matrix_function("system")
761 @property
762 def input_matrix_function(self) -> Callable:
763 return self._build_matrix_function("input")
765 @property
766 def system_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
767 return self._build_matrix("system")
769 @property
770 def input_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
771 return self._build_matrix("input")
773 @property
774 def temperature_vector_symbol(self) -> list:
775 if self._temperature_vector_symbol is None:
776 result = [node.temperature_symbol for node in self.nodes]
777 self._temperature_vector_symbol = result
778 return self._temperature_vector_symbol
780 @property
781 def temperature_vector(self) -> np.ndarray:
782 result = [node.temperature for node in self.nodes]
783 return np.array(result).flatten()
785 @property
786 def input_vector(self) -> np.ndarray:
787 result = [n for input_item in self.inputs for n in input_item.values]
788 return np.array(result).flatten()
790 @property
791 def input_vector_symbol(self) -> np.ndarray:
792 if self._input_vector_symbol is None:
793 result = []
794 if self.inputs is not None and self.inputs != []:
795 result = [n for input_item in self.inputs for n in input_item.symbols]
796 self._input_vector_symbol = np.array(result).flatten()
797 return self._input_vector_symbol
799 # @property
800 # def seconds_of_day(self):
801 # """
802 # Returns the seconds that have already passed on the day of ``self.start_time``.
803 #
804 # Returns
805 # -------
806 # float | int :
807 # The passed seconds of the day.
808 # """
809 # return (self.settings.start_date - self.settings.start_date.astype("datetime64[D]")) / np.timedelta64(1, "s")
811 @property
812 def save_folder_path(self):
813 if self.settings.save_folder_path is None:
814 return run_folder
815 return self.settings.save_folder_path
817 @property
818 def pickle_path(self) -> str:
819 filename = f"{self.hash}.pickle"
820 return os.path.normpath(os.path.join(self.save_folder_path, filename))
822 def pickle_path_result(self, t_span, name_add_on=None):
823 name_add_on = add_leading_underscore(name_add_on)
824 filename = f"{self.hash}{name_add_on}_{int(t_span[1])}_result.pickle"
825 return os.path.normpath(os.path.join(self.save_folder_path, filename))
827 @property
828 def pickle_path_single_solution(self) -> str:
829 filename = f"{self.hash}_single_solution.pickle"
830 return os.path.normpath(os.path.join(self.save_folder_path, filename))
832 def pickle_path_solution(self, t_span, name_add_on=None) -> str:
833 name_add_on = add_leading_underscore(name_add_on)
834 filename = f"{self.hash}{name_add_on}_{int(t_span[0])}_{int(t_span[1])}_solution.pickle"
835 return os.path.normpath(os.path.join(self.save_folder_path, filename))
837 def save_matrices(self):
838 if self.save_to_pickle:
839 file_path = self.pickle_path
840 matrices = [
841 self._system_matrix_symbol,
842 self._input_matrix_symbol,
843 self._input_vector_symbol,
844 self._temperature_vector_symbol,
845 self.forbidden_values,
846 ]
847 with open(file_path, "wb") as f:
848 pickle.dump(matrices, f)
850 def load_matrices(self):
851 file_path = self.pickle_path
852 if os.path.exists(file_path):
853 with open(file_path, "rb") as f:
854 matrices = pickle.load(f)
855 self._system_matrix_symbol = matrices[0]
856 self._input_matrix_symbol = matrices[1]
857 self._input_vector_symbol = matrices[2]
858 self._temperature_vector_symbol = matrices[3]
859 self.forbidden_values = matrices[4]
860 print("matrices loaded")
861 return True
862 else:
863 print("No network found nor loaded.")
864 return False
866 def save_last_step_solution(self, pickle_path_single_solution: str = None):
867 """
868 Saves the last time step solution.
869 """
870 if pickle_path_single_solution is None:
871 pickle_path_single_solution = self.pickle_path_single_solution
872 file_path = pickle_path_single_solution
873 self.rc_solution.save_last_step(file_path)
875 def load_initial_values(self, return_bool: bool = False, pickle_path_single_solution: str = None) -> None | bool:
876 """
877 Loads the last time step solution (temperature vector) and sets it as initial values.
878 """
879 if pickle_path_single_solution is None:
880 pickle_path_single_solution = self.pickle_path_single_solution
881 file_path = pickle_path_single_solution
882 if os.path.exists(file_path):
883 with open(file_path, "rb") as f:
884 solution_tuple = pickle.load(f)
885 temp_vector = solution_tuple[0]
886 input_vector = solution_tuple[1]
887 for node, temp in zip(self.rc_objects.nodes, temp_vector):
888 node.initial_temperature = temp
889 for item, value in zip(self.inputs, input_vector):
890 item.initial_value = value
891 if return_bool:
892 return True
893 else:
894 print("No temperature vector found nor loaded.")
895 if return_bool:
896 return False
897 return None
899 @property
900 def inhomogeneous_system(self) -> bool:
901 """
902 Returns True if inputs are existing and the network is inhomogeneous. False otherwise.
904 Returns
905 -------
906 bool :
907 True if inputs are existing and the network is inhomogeneous. False otherwise.
908 """
909 if self.rc_objects.inputs is None or self.rc_objects.inputs == []:
910 return False
911 return True
913 def determine_max_step(self) -> float:
914 max_step = 1
915 if self.rc_objects.mass_flow_nodes is not None or self.rc_objects.mass_flow_nodes != []:
916 loop_counter = 0
917 while loop_counter < 1000:
918 try:
919 self.check_courant(max_step) # approximated maximum time step used during iterations.
920 # if no error
921 break
922 except HighCourantNumberError:
923 loop_counter += 1
924 max_step *= 0.99
925 if loop_counter >= 1000:
926 raise HighCourantNumberError
927 return max_step
929 def solve_stationary(self, time_step: float | int = 0):
930 """
931 Determines the analytic solution of the network (if it exists). It works for symbolic and numeric matrices.
933 Parameters
934 ----------
935 time_step : float | int, optional
936 The time step as which the solution is saved in the rc_solution object.
937 This parameter has no effect if the network is symbolic.
939 Returns
940 -------
941 sp.Matrix | MutableDenseMatrix or None:
942 Stationary symbolic temperature vector of shape (n, 1) (only if result is symbolic, otherwise nothing is
943 returned and the result is written to the rc_solution object using time_step).
944 """
945 if self.inhomogeneous_system:
946 result = solve_stationary(
947 system_matrix=self.system_matrix,
948 input_matrix=self.input_matrix,
949 input_vector=self.input_vector,
950 )
951 else:
952 result = solve_stationary(
953 system_matrix=self.system_matrix,
954 )
956 if isinstance(result, Basic) and result.free_symbols is not None:
957 # return symbolic result
958 return result
959 else:
960 # save result in rc_solution
961 self.rc_solution.add_to_solution([np.array(time_step).reshape(-1,)], [np.array(result).reshape(-1, 1)])
963 def solve_network(
964 self,
965 t_span: tuple | Any = None,
966 print_progress=False,
967 name_add_on: str = "",
968 check_courant: bool = True,
969 time_dependent_function: Callable = None,
970 hook_function: Callable = None,
971 expected_solution_size_mb=5000,
972 **kwargs: dict[str, int | Any],
973 ) -> None:
974 """
975 Solves the network at the given times.
977 If time_vector is None, one second is calculated.
979 Parameters
980 ----------
981 t_span : tuple | Any
982 Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf.
983 Both t0 and tf must be floats or values interpretable by the float conversion function.
984 print_progress : bool, optional
985 If True, some print-outs are made during solving to get the time step that is currently simulated.
986 name_add_on : str, optional
987 Optional add-on to the name of in-between saves that is placed after the hash value (separated by "_").
988 Example save name:
989 42395d3d9f07f06ce9c9cb609b406aa54fc2dc5e8c4d9cc75987d9a02541ad2f_nameAddOn_zw_000001080_h.pickle
990 check_courant : bool, optional
991 If True, self.check_courant(0.4) is run before simulating.
992 time_dependent_function : Callable
993 A function returning the values for all time dependent symbols that are in the matrices in the same order as
994 self.get_time_dependent_symbols().
995 This function is required if time dependent symbols exist.
996 It must return an iterable (e.g. list)
997 hook_function : Callable, optional
998 A function that is run before the network is solved using solve_ivp.
999 Can be used to catch the time before the solving actually starts (e.g. used in `PerformanceTest`\\).
1000 expected_solution_size_mb : int | float, optional
1001 The expected solution size in Megabytes. Is used for RAM-Management in ``SystemHandler.solve()``\\.
1002 The solution size depends on the size of the RC network and number of saved time steps.
1003 kwargs : dict[str: int | Any], optional
1004 Passed to SystemHandler.
1005 """
1006 if self.get_time_dependent_symbols():
1007 assert time_dependent_function
1009 continued_simulation = False
1010 if self.load_solution or self.continue_canceled:
1011 success = self.rc_solution.load_solution(
1012 self.pickle_path_solution(t_span, name_add_on), last_time_step=t_span[-1]
1013 )
1014 if not success:
1015 success = self.rc_solution.load_solution(
1016 self.pickle_path_result(t_span, name_add_on), last_time_step=t_span[-1]
1017 )
1018 if success or (not isinstance(success, bool) and success == 0): # ==0 because 0 could be the last time
1019 # step (will never be the case.... -.-)
1020 if not isinstance(success, bool):
1021 if not self.continue_canceled:
1022 self.rc_solution.delete_solution_except_first()
1023 print(f"Partial solution detected but not used because continue_canceled is False.")
1024 else:
1025 print(
1026 f"Simulation is started at loaded time step {success} on {success / t_span[-1] * 100:.2f} %."
1027 )
1028 t_span = (success, t_span[-1])
1029 continued_simulation = True
1030 else:
1031 print("Solution was loaded from file.")
1032 return None
1034 if check_courant:
1035 new_max_step = self.determine_max_step()
1036 if self.settings.solve_settings.max_step > new_max_step:
1037 print(
1038 f"Because of Courant Number: New maximum step is set to {new_max_step} (previous:"
1039 f" {self.settings.solve_settings.max_step})"
1040 )
1041 self.settings.solve_settings.max_step = new_max_step
1043 temperature_vector = np.array(self.temperature_vector).flatten()
1044 system_matrix = self.system_matrix
1045 time_symbols = self.get_time_dependent_symbols()
1046 if time_symbols:
1047 system_matrix = SparseSymbolicEvaluator(system_matrix, time_symbols)
1049 system_handler_kwargs = {
1050 "system_matrix": system_matrix,
1051 "rc_solution": self.rc_solution,
1052 "settings": self.settings,
1053 "print_progress": print_progress,
1054 "print_points": np.linspace(t_span[0], t_span[-1], 20 + 1),
1055 "batch_end": t_span[-1],
1056 "time_dependent_function": time_dependent_function,
1057 }
1059 # create the time_steps to solve the results (every X seconds a value)
1060 t_eval = np.linspace(
1061 t_span[0], t_span[1], max(1, int((t_span[1] - t_span[0]) / self.settings.save_all_x_seconds) + 1)
1062 )
1064 if self.inhomogeneous_system:
1065 input_matrix = self.input_matrix
1066 if time_symbols:
1067 input_matrix = SparseSymbolicEvaluator(input_matrix, time_symbols)
1068 input_vector = self.input_vector
1070 # Each Input can be called as a function to calculate new boundary values during simulation:
1071 functions_list = self.rc_objects.inputs
1072 # Also each Input has some functions that can be called independently of the object that return values for
1073 # the calculation.
1074 kwargs_functions = {}
1075 for item in self.rc_objects.inputs:
1076 kwargs_functions.update(item.get_kwargs_functions()) # Values should be overwritten so that the same
1077 # values are calculated only once
1079 system_handler_kwargs.update(
1080 {
1081 "input_matrix": input_matrix,
1082 "input_vector": input_vector,
1083 "functions_list": functions_list,
1084 "kwargs_functions": kwargs_functions,
1085 "t_eval": t_eval,
1086 "first_time": t_span[0],
1087 }
1088 )
1090 # only update the keys that are in the predefined dictionary (it would also work if everything is passed anyway)
1091 system_handler_kwargs.update({k: kwargs[k] for k in system_handler_kwargs if k in kwargs})
1093 system_handler = self.system_handler_type(**system_handler_kwargs)
1094 save_prefix = f"{self.hash}"
1095 if name_add_on:
1096 save_prefix += f"_{name_add_on.removeprefix('_').removesuffix('_')}"
1097 solve_handler = SolveIVPHandler(
1098 system_handler=system_handler,
1099 save_prefix=save_prefix,
1100 solve_settings=self.settings.solve_settings,
1101 )
1102 if hook_function is not None:
1103 hook_function()
1104 solve_handler.solve(
1105 t_span=t_span,
1106 y0=temperature_vector,
1107 t_eval=t_eval,
1108 continued_simulation=continued_simulation,
1109 expected_solution_size_mb=expected_solution_size_mb,
1110 )
1111 return None
1113 @property
1114 def system_handler_type(self):
1115 """
1116 Returns the SystemHandler type depending on which system (in/homogeneous) is needed.
1118 Returns
1119 -------
1120 type :
1121 The system handler type.
1122 """
1123 if self.inhomogeneous_system:
1124 return InhomogeneousSystemHandler
1125 return HomogeneousSystemHandler
1127 @property
1128 def all_objects(self) -> list:
1129 return [*(self.rc_objects.resistors or []), *(self.rc_objects.nodes or []), *(self.rc_objects.inputs or [])]
1131 @property
1132 def objects_with_identical_symbols(self) -> list:
1133 """
1134 Get all objects with symbols that are relevant for the system matrix.
1136 You cannot just use every object, because some (e.g. Resistors) can be summarized in local groups with one
1137 symbol representing the whole group, like the equivalent resistance of multiple connected Resistors.
1139 No `InternalHeatSource` s are returned, because their symbol is already represented in the corresponding Node.
1141 Returns
1142 -------
1143 list :
1144 All network objects without the ones that are already taken into account by the symbols of another.
1145 The list has the same order every time it is created.
1146 """
1148 def filter_resistors(resistor_list):
1149 # Create set of all resistors for O(1) lookups
1150 all_resistors = {obj for obj in resistor_list if isinstance(obj, Resistor)}
1151 to_exclude = set()
1153 # Collect all resistors to exclude in order of appearance
1154 for obj in resistor_list:
1155 if isinstance(obj, Resistor) and obj not in to_exclude:
1156 inbetween = set(obj.all_resistors_inbetween) - {obj}
1157 to_exclude.update(inbetween & all_resistors)
1159 # Filter maintaining original order
1160 return [obj for obj in resistor_list if not obj in to_exclude]
1162 result = [rc_obj for rc_obj in self.all_objects if not isinstance(rc_obj, InternalHeatSource)]
1163 return filter_resistors(result)
1165 def get_symbols(self):
1166 """
1167 Returns a list with all symbols, except time dependent symbols.
1169 Returns
1170 -------
1171 list :
1172 All symbols, except time dependent symbols.
1173 """
1174 all_symbols = [symb for rc_object in self.objects_with_identical_symbols for symb in rc_object.symbols]
1175 return all_symbols
1177 def get_values(self):
1178 """
1179 Returns a list with all values, except of time dependent symbols.
1181 Returns
1182 -------
1183 list :
1184 All values, except of time dependent symbols.
1185 """
1186 all_values = [
1187 val if isinstance(val, Basic) else np.float64(val)
1188 for rc_object in self.objects_with_identical_symbols
1189 for val in rc_object.values
1190 ]
1191 return all_values
1193 def get_time_dependent_symbols(self) -> list:
1194 """
1195 A list with all time dependent symbols but each of it is only added once.
1197 Returns
1198 -------
1199 list :
1200 The time dependent symbols.
1201 """
1202 symbols = []
1203 seen = set()
1204 for rc_object in self.objects_with_identical_symbols:
1205 for symb in rc_object.time_dependent_symbols:
1206 if symb not in seen:
1207 seen.add(symb)
1208 symbols.append(symb)
1209 return symbols
1211 def get_symbols_and_values(self) -> tuple[list, list]:
1212 """
1213 Return two lists of first: all symbols and second: all associated values of all rc objects.
1215 Returns
1216 -------
1217 tuple[list, list] :
1219 """
1220 return self.get_symbols(), self.get_values()
1222 def make_system_matrices(self):
1223 """
1224 Creates the Jacobian matrices of the RC Network.
1226 If self.load_from_pickle the whole network will be loaded from pickle file if it exists to prevent heavy
1227 calculations.
1229 Returns
1230 -------
1231 sympy expression :
1232 The system matrix as sympy expression.
1233 """
1234 success = False
1235 if self.load_from_pickle:
1236 try:
1237 success = self.load_matrices()
1238 except Exception:
1239 pass
1240 if not success:
1241 # fill system matrix
1242 terms = []
1243 involved_symbols: list[set] = []
1244 temperature_symbols: list[set] = []
1245 variables = []
1246 for node in self.nodes:
1247 # works only for one term
1248 term, t_symbols, all_symbols = node.temperature_derivative_term()
1249 terms.append(term)
1250 involved_symbols.append(all_symbols)
1251 temperature_symbols.append(t_symbols)
1252 variables.append(node.temperature_symbol)
1254 self._system_matrix_symbol = build_jacobian(
1255 terms, variables, temperature_symbols, num_cores=self.num_cores_jacobian
1256 )
1257 print("system matrix created")
1259 self.save_matrices()
1261 input_variables = list(self.input_vector_symbol)
1262 self._input_matrix_symbol = build_jacobian(
1263 terms, input_variables, involved_symbols, num_cores=self.num_cores_jacobian
1264 )
1265 print("input matrix created")
1267 self.save_matrices()
1269 def check_courant(self, time_step: float):
1270 """
1271 Calculates the Courant number for the whole network and raises an error if its larger than 1.
1273 Parameters
1274 ----------
1275 time_step : float
1276 The maximum time step in seconds.
1278 Raises
1279 ------
1280 HighCourantNumberError :
1281 If the courant number is greater than 1 the network will calculate shit.
1283 """
1284 # TODO: Do not use the courant number but return a critical maximum time_step to easy raise an error during
1285 # iterations. So: no iteration, but calculate the maximum time_step and return this so that it can be set.
1286 mass_flow_nodes = self.rc_objects.mass_flow_nodes
1287 mass_flow_node: MassFlowNode
1288 for mass_flow_node in mass_flow_nodes:
1289 if mass_flow_node.courant_number(time_step) > 1:
1290 raise HighCourantNumberError
1292 def matrix_to_latex_diag(self):
1293 matrix = self.system_matrix_symbol
1294 diag_elements = matrix.diagonal()
1295 matrix_no_diag = matrix - diag(*diag_elements)
1297 matrix_latex = latex(matrix_no_diag)
1298 diag_latex = latex(Matrix(diag_elements).reshape(len(diag_elements), 1))
1300 return f"{matrix_latex} + \\mathrm{{diag}}\\left({diag_latex}\\right)"