Coverage for pyrc \ core \ network.py: 71%
397 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8from __future__ import annotations
9import os
10import pickle
11from abc import abstractmethod
12from copy import copy
13from typing import Any, TYPE_CHECKING, Callable
15import numpy as np
16from sympy import lambdify, latex, SparseMatrix, Matrix, diag, Symbol, Basic, ImmutableSparseMatrix, MutableSparseMatrix
17import sympy as sym
18import hashlib
19import networkx as nx
20from scipy.sparse import coo_matrix
22from pyrc.paths import run_folder
23from pyrc.core.components.resistor import Resistor
24from pyrc.core.inputs import InternalHeatSource
25from pyrc.core.settings import initial_settings, Settings
26from pyrc.core.solver.symbolic import SparseSymbolicEvaluator
27from pyrc.core.solver.handler import InhomogeneousSystemHandler, SolveIVPHandler, HomogeneousSystemHandler
28from pyrc.tools.errors import HighCourantNumberError
29from pyrc.tools.functions import add_leading_underscore
31from pyrc.tools.science import build_jacobian
32from pyrc.core.components.templates import RCObjects, RCSolution, EquationItem, initial_rc_objects, Cell
33from pyrc.core.components.templates import solution_object
34from pyrc.core.components.capacitor import Capacitor
36if TYPE_CHECKING:
37 from pyrc.core.nodes import MassFlowNode
38 from pyrc.core.components.templates import EquationItemInput
39 from scipy.sparse import spmatrix, sparray
42class RCNetwork:
43 def __init__(
44 self,
45 save_to_pickle: bool = True,
46 load_from_pickle: bool = True,
47 load_solution: bool = True,
48 num_cores_jacobian: int = 1, # TODO: Parallelization only works on linux currently (because it forks instead
49 # of spawning it). Problem is: Sympy symbols are not picklable.
50 settings: Settings = initial_settings,
51 rc_objects: RCObjects = initial_rc_objects,
52 rc_solution: RCSolution = solution_object,
53 # strategy: str = "lambdify",
54 continue_canceled: bool = True,
55 ):
56 """
58 Parameters
59 ----------
60 save_to_pickle
61 load_from_pickle
62 load_solution
63 num_cores_jacobian
64 settings
65 rc_objects
66 rc_solution
67 continue_canceled : bool, optional
68 If True, continue canceled simulations.
69 Overwrites load_solution
70 """
71 self.rc_objects: RCObjects = rc_objects
72 self.settings = settings
73 self._system_matrix_symbol: MutableSparseMatrix = None
74 self._input_matrix_symbol: MutableSparseMatrix = None
75 self._temperature_vector_symbol = None
76 self._input_vector_symbol = None
77 self._input_matrix = None
78 self._system_matrix = None
79 self.__hash = None
80 self.rc_solution: RCSolution = rc_solution
81 # link rc objects to rc solution
82 self.rc_solution.rc_objects = rc_objects
83 # self.strategy = strategy
85 # For each time dependent symbol the following values are not allowed because they would make an entry in the
86 # system matrix be infinity
87 self.forbidden_values: dict[Symbol, set] = {}
89 self.save_to_pickle: bool = save_to_pickle
90 self.load_from_pickle: bool = load_from_pickle
91 self.load_solution: bool = load_solution
92 self.continue_canceled: bool = continue_canceled
94 if num_cores_jacobian is None:
95 num_cores_jacobian = os.cpu_count()
96 self.num_cores_jacobian = min(max(num_cores_jacobian, 1), os.cpu_count())
98 self.groups: dict[str:Capacitor] = {}
100 def __getattr__(self, item):
101 if getattr(self.rc_objects, item, None):
102 return self.rc_objects.__getattribute__(item)
103 return AttributeError
105 def create_network(self, *args, **kwargs):
106 EquationItem.reset_counter()
107 Resistor.reset_counter()
108 self.rc_objects.wipe_all()
109 self._create_network(*args, **kwargs)
110 assert self.nodes is not None
112 @abstractmethod
113 def _create_network(self, *args, **kwargs):
114 pass
116 def copy_dict(self):
117 return {
118 "save_to_pickle": self.save_to_pickle,
119 "load_from_pickle": self.load_from_pickle,
120 "load_solution": self.load_solution,
121 "num_cores_jacobian": self.num_cores_jacobian,
122 "settings": copy(self.settings),
123 }
125 def __copy__(self):
126 return RCNetwork(**self.copy_dict())
128 @property
129 def nodes(self):
130 return self.rc_objects.nodes
132 @property
133 def hash(self):
134 """
135 Compute a deterministic hash for the `RCNetwork` where node identity (class name + obj.id) and topology matter.
137 For Capacitor instances, an optional internal_heat_source (identified by its id) is encoded as a node attribute.
139 For Cell subclass instances, position and delta arrays are encoded.
141 Returns
142 -------
143 str :
144 SHA-256 hex digest of the canonical edge and node representation.
145 """
146 if self.__hash is None:
148 def node_key(_obj) -> str:
149 return f"{type(_obj).__name__}:{_obj.id}"
151 def array_str(arr: np.ndarray) -> str:
152 return ",".join(f"{v:.17g}" for v in arr.ravel())
154 graph = nx.Graph()
155 for obj in self.rc_objects.all:
156 attrs = {}
157 if isinstance(obj, Capacitor):
158 attrs["heat_source_id"] = (
159 obj.internal_heat_source.id if obj.internal_heat_source is not None else None
160 )
161 if isinstance(obj, Cell):
162 attrs["position"] = array_str(obj.position)
163 attrs["delta"] = array_str(obj.delta)
164 graph.add_node(node_key(obj), **attrs)
166 # loop over all linked/connected network objects
167 for obj in self.rc_objects.all:
168 for neighbour in obj.neighbours:
169 graph.add_edge(node_key(obj), node_key(neighbour))
171 node_str = "|".join(
172 f"{nid}#{data}"
173 for nid, data in sorted((nid, str(sorted(data.items()))) for nid, data in graph.nodes(data=True))
174 )
175 edge_str = "|".join(sorted(f"{min(u, v)}-{max(u, v)}" for u, v in graph.edges()))
176 canonical = f"nodes:{node_str};edges:{edge_str}"
177 self.__hash = hashlib.sha256(canonical.encode()).hexdigest()
178 return self.__hash
180 @property
181 def time_steps(self) -> list:
182 """
183 Returns a list with all time steps.
185 Returns
186 -------
187 list :
188 The time steps.
189 """
190 return self.rc_solution.time_steps
192 def reset_properties(self):
193 self._temperature_vector_symbol = None
195 @property
196 def inputs(self) -> list[EquationItemInput]:
197 return self.rc_objects.inputs
199 def change_static_dynamic(self, calculate_static: bool | str):
200 if isinstance(calculate_static, str):
201 match calculate_static.lower():
202 case "static" | "statisch" | "s":
203 calculate_static = True
204 case _:
205 calculate_static = False
206 self.settings.calculate_static = calculate_static
208 def get_node_by_id(self, _id: str | int):
209 """
210 Returns the node with the given id.
212 Parameters
213 ----------
214 _id : str | int
215 The id of the node.
217 Returns
218 -------
219 TemperatureNode | InternalHeatSource :
220 """
221 if isinstance(_id, str):
222 _id = int(_id)
223 return next((x for x in self.rc_objects.all_equation_objects if x.id == _id), None)
225 def no_infinity(self, sp_matrix: SparseMatrix | spmatrix | sparray) -> bool:
226 """
227 Checks the given sparse matrix for infinity entries and symbolic singularities.
229 Notes
230 -----
231 Saves a dict, which maps each free sympy symbol to a set of values at which at least one symbolic entry
232 diverges. Empty if no symbolic entries exist.
234 Parameters
235 ----------
236 sp_matrix : SparseMatrix | spmatrix | sparray
237 A sparse matrix/vector to check, may contain numeric values or sympy
238 expressions.
240 Returns
241 -------
242 tuple[bool, dict] :
243 bool : True if no numeric infinity was found.
244 """
245 sp_matrix: SparseMatrix | spmatrix | sparray
246 if isinstance(sp_matrix, SparseMatrix) or isinstance(sp_matrix, ImmutableSparseMatrix):
247 entries = sp_matrix.todok().values()
248 else:
249 sp_matrix: spmatrix | sparray
250 entries = sp_matrix.data
252 for entry in entries:
253 if isinstance(entry, Basic):
254 if not entry.free_symbols:
255 if entry in (sym.oo, -sym.oo, sym.zoo, sym.nan):
256 return False
257 else:
258 _, denominator = sym.fraction(sym.cancel(entry))
259 if denominator != sym.Integer(1):
260 for s in denominator.free_symbols:
261 solution = sym.solve(denominator, s)
262 self.forbidden_values.setdefault(s, set()).update(solution)
263 else:
264 if np.isinf(entry):
265 return False
267 return True
269 def _get_matrix_symbol(self, key: str) -> SparseMatrix | ImmutableSparseMatrix:
270 """
271 Returns the desired matrix in symbolic notation.
273 Parameters
274 ----------
275 key : str
276 Either "symbol" or "input" to determine the matrix type.
278 Returns
279 -------
280 SparseMatrix | ImmutableSparseMatrix :
281 The (sympy) symbolic notation of the matrix.
282 """
283 attr = f"_{key}_matrix_symbol"
284 if getattr(self, attr) is None:
285 self.make_system_matrices()
286 return getattr(self, attr)
288 def _build_matrix_function(self, key: str) -> Callable:
289 """
290 Builds the desired matrix lambda function using the symbols matrix, respectively.
292 Parameters
293 ----------
294 key : str
295 Either "symbol" or "input" to determine the matrix type.
297 Returns
298 -------
299 Callable :
300 The function of the matrix.
301 """
302 matrix_symbol = self._get_matrix_symbol(key)
303 all_symbols = self.get_symbols()
304 if self.get_time_dependent_symbols():
305 ordered_symbols = [
306 next((s for s in matrix_symbol.free_symbols if s.name == symbol.name), symbol) for symbol in all_symbols
307 ]
308 return lambdify(ordered_symbols, matrix_symbol, "sympy")
309 return lambdify(all_symbols, matrix_symbol, "scipy")
311 def _build_matrix_direct(self, key: str) -> spmatrix:
312 """
313 Bypasses lambdify by substituting all symbol values via name-based xreplace.
315 Parameters
316 ----------
317 key : str
318 Either "symbol" or "input" to determine the matrix type.
320 Notes
321 -----
322 The replace must be done using the name of the symbols and not the objects itself because they might be loaded
323 from disk and might not match, but the names do.
325 Returns
326 -------
327 scipy.spmatrix :
328 A scipy sparse matrix with all values substituted.
329 """
330 matrix_symbol = self._get_matrix_symbol(key)
331 name_to_value = {s.name: v for s, v in zip(self.get_symbols(), self.get_values())}
332 free = matrix_symbol.free_symbols
333 substitution_map = {s: name_to_value[s.name] for s in free if s.name in name_to_value}
335 dok = matrix_symbol.todok()
336 rows, cols, data = [], [], []
337 for (i, j), expr in dok.items():
338 rows.append(i)
339 cols.append(j)
340 data.append(float(expr.xreplace(substitution_map)))
342 return coo_matrix((data, (rows, cols)), shape=matrix_symbol.shape).tocsr()
344 # import warnings
345 # import logging
346 # import pickle
347 # from concurrent.futures import ProcessPoolExecutor
348 # from pathlib import Path
349 # from typing import Callable
350 #
351 # import numba
352 # import numpy as np
353 # from scipy.sparse import coo_matrix, csr_matrix
354 # from sympy import lambdify
355 # from sympy.printing import ccode
356 #
357 # def _xreplace_element(args: tuple) -> object:
358 # """Apply xreplace to a single expression."""
359 # expr, static_map = args
360 # return expr.xreplace(static_map)
361 #
362 # def _build_c_source(reduced_exprs: list, td_ordered: list, func_name: str) -> str:
363 # """
364 # Generates a C source file with a single function iterating over all nonzero
365 # expressions.
366 #
367 # Parameters
368 # ----------
369 # reduced_exprs : list
370 # Sympy expressions with static symbols already substituted.
371 # td_ordered : list
372 # Ordered time-dependent sympy symbols.
373 # func_name : str
374 # Name of the generated C function.
375 #
376 # Returns
377 # -------
378 # str :
379 # C source code as a string.
380 # """
381 # args = ", ".join(f"double {s.name}" for s in td_ordered)
382 # lines = [
383 # "#include <stddef.h>",
384 # f"void {func_name}({args}, double *out, size_t n) {{",
385 # ]
386 # for i, expr in enumerate(reduced_exprs):
387 # lines.append(f" out[{i}] = {ccode(expr)};")
388 # lines.append("}")
389 # return "\n".join(lines)
390 #
391 # def _compile_c_function(
392 # source: str,
393 # func_name: str,
394 # n_out: int,
395 # cache_path: Path,
396 # ) -> Callable:
397 # """
398 # Compiles C source to a shared library and returns a callable via ctypes.
399 #
400 # Parameters
401 # ----------
402 # source : str
403 # C source code string.
404 # func_name : str
405 # Name of the C function to load.
406 # n_out : int
407 # Number of output values (nnz).
408 # cache_path : Path
409 # Path to write the compiled .so / .dll.
410 #
411 # Returns
412 # -------
413 # Callable :
414 # Python callable wrapping the compiled C function.
415 # """
416 # import ctypes
417 # import subprocess
418 # import tempfile
419 #
420 # src_path = cache_path.with_suffix(".c")
421 # src_path.write_text(source)
422 #
423 # result = subprocess.run(
424 # ["gcc", "-O3", "-shared", "-fPIC", "-o", str(cache_path), str(src_path)],
425 # capture_output=True,
426 # text=True,
427 # )
428 # if result.returncode != 0:
429 # raise RuntimeError(result.stderr)
430 #
431 # lib = ctypes.CDLL(str(cache_path))
432 # fn = getattr(lib, func_name)
433 # fn.restype = None
434 #
435 # out_type = ctypes.c_double * n_out
436 #
437 # def c_callable(*td_vals: float) -> np.ndarray:
438 # out = out_type()
439 # fn(*[ctypes.c_double(v) for v in td_vals], out, ctypes.c_size_t(n_out))
440 # return np.frombuffer(out, dtype=np.float64).copy()
441 #
442 # return c_callable
444 # def _prepare_matrix_ingredients(
445 # self, key: str, parallel: bool
446 # ) -> tuple[list, list, np.ndarray, np.ndarray, tuple, list]:
447 # """
448 # Extracts and reduces nonzero expressions, substituting all static symbols.
449 #
450 # Parameters
451 # ----------
452 # key : str
453 # Either "symbol" or "input" to determine the matrix type.
454 # parallel : bool
455 # If True, parallelizes static substitution via ProcessPoolExecutor.
456 #
457 # Returns
458 # -------
459 # tuple :
460 # td_ordered, reduced_exprs, rows, cols, shape, raw exprs (unreduced)
461 # """
462 # matrix_symbol = self._get_matrix_symbol(key)
463 # all_symbols = self.get_symbols()
464 # all_values = self.get_values()
465 # td_symbols = self.get_time_dependent_symbols()
466 # td_names = {s.name for s in td_symbols}
467 #
468 # static_map = {
469 # s_mat: val
470 # for sym, val in zip(all_symbols, all_values)
471 # if sym.name not in td_names
472 # for s_mat in matrix_symbol.free_symbols
473 # if s_mat.name == sym.name
474 # }
475 #
476 # td_ordered = [
477 # next(s for s in matrix_symbol.free_symbols if s.name == td.name)
478 # for td in td_symbols
479 # if any(s.name == td.name for s in matrix_symbol.free_symbols)
480 # ]
481 #
482 # dok = matrix_symbol.todok()
483 # shape = matrix_symbol.shape
484 # keys, exprs = zip(*dok.items()) if dok else ([], [])
485 # rows = np.array([k[0] for k in keys], dtype=np.int32)
486 # cols = np.array([k[1] for k in keys], dtype=np.int32)
487 #
488 # TODO: do not do it like that, it is worse than the solution that already exists.
489 # if parallel:
490 # tasks = [(expr, static_map) for expr in exprs]
491 # with ProcessPoolExecutor() as executor:
492 # reduced_exprs = list(executor.map(_xreplace_element, tasks))
493 # else:
494 # reduced_exprs = [expr.xreplace(static_map) for expr in exprs]
495 #
496 # return td_ordered, reduced_exprs, rows, cols, shape
497 #
498 # TODO: This is worse than the current solution, but the other version (numby/c...) should be implemented to speed
499 # things up. But attention: Currently, it is just vibe coded...
500 # def _build_matrix_function_lambdify(
501 # self, key: str, parallel: bool = False
502 # ) -> Callable:
503 # """
504 # Builds the matrix function using lambdify with numpy backend.
505 #
506 # Parameters
507 # ----------
508 # key : str
509 # Either "symbol" or "input" to determine the matrix type.
510 # parallel : bool
511 # If True, parallelizes static symbol substitution.
512 #
513 # Returns
514 # -------
515 # Callable :
516 # Function accepting td symbol values, returning a csr_matrix.
517 # """
518 # td_ordered, reduced_exprs, rows, cols, shape = (
519 # self._prepare_matrix_ingredients(key, parallel)
520 # )
521 #
522 # if not len(rows):
523 # empty = csr_matrix(shape, dtype=float)
524 #
525 # def assembled(*_) -> csr_matrix:
526 # return empty
527 #
528 # return assembled
529 #
530 # data_func = lambdify(td_ordered, reduced_exprs, "numpy")
531 # template = coo_matrix(
532 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
533 # ).tocsr()
534 #
535 # def assembled(*td_vals: float) -> csr_matrix:
536 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
537 # return template
538 #
539 # return assembled
540 #
541 # def _build_matrix_function_numba(self, key: str, parallel: bool = False) -> Callable:
542 # """
543 # Builds the matrix function using a numba-JIT compiled callable, with
544 # caching to self.save_folder_path keyed by self.hash.
545 #
546 # Parameters
547 # ----------
548 # key : str
549 # Either "symbol" or "input" to determine the matrix type.
550 # parallel : bool
551 # If True, parallelizes static symbol substitution.
552 #
553 # Returns
554 # -------
555 # Callable :
556 # Function accepting td symbol values, returning a csr_matrix.
557 # """
558 # td_ordered, reduced_exprs, rows, cols, shape = (
559 # self._prepare_matrix_ingredients(key, parallel)
560 # )
561 #
562 # if not len(rows):
563 # empty = csr_matrix(shape, dtype=float)
564 #
565 # def assembled(*_) -> csr_matrix:
566 # return empty
567 #
568 # return assembled
569 #
570 # cache_path = Path(self.save_folder_path) / f"{self.hash}_{key}_numba.pkl"
571 #
572 # if cache_path.exists():
573 # with open(cache_path, "rb") as f:
574 # data_func = pickle.load(f)
575 # else:
576 # data_func = lambdify(td_ordered, reduced_exprs, "numpy")
577 # data_func_nb = numba.njit(data_func, cache=True)
578 # with open(cache_path, "wb") as f:
579 # pickle.dump(data_func_nb, f)
580 # data_func = data_func_nb
581 #
582 # template = coo_matrix(
583 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
584 # ).tocsr()
585 #
586 # def assembled(*td_vals: float) -> csr_matrix:
587 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
588 # return template
589 #
590 # return assembled
591 #
592 # def _build_matrix_function_c(self, key: str, parallel: bool = False) -> Callable:
593 # """
594 # Builds the matrix function using a compiled C shared library via ctypes,
595 # falling back to numba if compilation fails.
596 #
597 # Parameters
598 # ----------
599 # key : str
600 # Either "symbol" or "input" to determine the matrix type.
601 # parallel : bool
602 # If True, parallelizes static symbol substitution.
603 #
604 # Returns
605 # -------
606 # Callable :
607 # Function accepting td symbol values, returning a csr_matrix.
608 # """
609 # td_ordered, reduced_exprs, rows, cols, shape = (
610 # self._prepare_matrix_ingredients(key, parallel)
611 # )
612 #
613 # if not len(rows):
614 # empty = csr_matrix(shape, dtype=float)
615 #
616 # def assembled(*_) -> csr_matrix:
617 # return empty
618 #
619 # return assembled
620 #
621 # func_name = f"matrix_func_{self.hash}_{key}"
622 # so_path = Path(self.save_folder_path) / f"{func_name}.so"
623 #
624 # try:
625 # if so_path.exists():
626 # import ctypes
627 # lib = ctypes.CDLL(str(so_path))
628 # fn = getattr(lib, func_name)
629 # n_out = len(rows)
630 # out_type = ctypes.c_double * n_out
631 #
632 # def data_func(*td_vals: float) -> np.ndarray:
633 # out = out_type()
634 # fn(
635 # *[ctypes.c_double(v) for v in td_vals],
636 # out,
637 # ctypes.c_size_t(n_out),
638 # )
639 # return np.frombuffer(out, dtype=np.float64).copy()
640 # else:
641 # source = _build_c_source(reduced_exprs, td_ordered, func_name)
642 # data_func = _compile_c_function(
643 # source, func_name, len(rows), so_path
644 # )
645 # except Exception as e:
646 # print(
647 # f"C compilation failed for '{key}' matrix ({e}), falling back to numba."
648 # )
649 # return self._build_matrix_function_numba(key, parallel=parallel)
650 #
651 # template = coo_matrix(
652 # (np.ones(len(rows), dtype=float), (rows, cols)), shape=shape
653 # ).tocsr()
654 #
655 # def assembled(*td_vals: float) -> csr_matrix:
656 # template.data[:] = np.asarray(data_func(*td_vals), dtype=float)
657 # return template
658 #
659 # return assembled
660 #
661 # def _build_matrix_function(
662 # self, key: str, strategy: str | None = None
663 # ) -> Callable:
664 # """
665 # Dispatcher selecting the matrix function build strategy.
666 #
667 # Parameters
668 # ----------
669 # key : str
670 # Either "symbol" or "input" to determine the matrix type.
671 # strategy : str | None
672 # One of "lambdify", "parallel", "numba", "c". If None, falls back
673 # to self.strategy set at __init__.
674 #
675 # Returns
676 # -------
677 # Callable :
678 # Function accepting td symbol values, returning a csr_matrix.
679 # """
680 # s = strategy if strategy is not None else self.strategy
681 # if s == "lambdify":
682 # return self._build_matrix_function_lambdify(key, parallel=False)
683 # if s == "parallel":
684 # return self._build_matrix_function_lambdify(key, parallel=True)
685 # if s == "numba":
686 # return self._build_matrix_function_numba(key, parallel=False)
687 # if s == "c":
688 # return self._build_matrix_function_c(key, parallel=False)
689 # raise ValueError(f"Unknown strategy '{s}'. Choose from: lambdify, parallel, numba, c.")
690 #
691 # def _build_matrix(
692 # self, key: str, strategy: str | None = None
693 # ) -> csr_matrix:
694 # """
695 # Inputs all values into the desired matrix function.
696 #
697 # Parameters
698 # ----------
699 # key : str
700 # Either "symbol" or "input" to determine the matrix type.
701 # strategy : str | None
702 # Overrides self.strategy if provided.
703 #
704 # Returns
705 # -------
706 # csr_matrix :
707 # The desired matrix.
708 # """
709 # attr = f"_{key}_matrix"
710 # if getattr(self, attr) is None:
711 # if not self.get_time_dependent_symbols():
712 # result = self._build_matrix_direct(key)
713 # else:
714 # result = self._build_matrix_function(key, strategy=strategy)(
715 # *self.get_values()
716 # )
717 # assert self.no_infinity(result), (
718 # f"Infinity detected in {key} matrix. System cannot be solved."
719 # )
720 # setattr(self, attr, result)
721 # return getattr(self, attr)
723 def _build_matrix(self, key: str) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
724 """
725 Inputs all values (can be symbolic values) into the desired matrix function.
727 Parameters
728 ----------
729 key : str
730 Either "symbol" or "input" to determine the matrix type.
732 Returns
733 -------
734 SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray :
735 The desired matrix. If no time dependent variables are used, it is a scipy sparse matrix, otherwise it's a
736 sympy SparseMatrix.
737 """
738 attr = f"_{key}_matrix"
739 if getattr(self, attr) is None:
740 if self.get_time_dependent_symbols():
741 result = self._build_matrix_function(key)(*self.get_values())
742 else:
743 result = self._build_matrix_direct(key)
744 assert self.no_infinity(result), f"Infinity detected in {key} matrix. System cannot be solved."
745 setattr(self, attr, result)
746 return getattr(self, attr)
748 @property
749 def system_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix:
750 return self._get_matrix_symbol("system")
752 @property
753 def input_matrix_symbol(self) -> SparseMatrix | ImmutableSparseMatrix:
754 return self._get_matrix_symbol("input")
756 @property
757 def system_matrix_function(self) -> Callable:
758 return self._build_matrix_function("system")
760 @property
761 def input_matrix_function(self) -> Callable:
762 return self._build_matrix_function("input")
764 @property
765 def system_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
766 return self._build_matrix("system")
768 @property
769 def input_matrix(self) -> SparseMatrix | ImmutableSparseMatrix | spmatrix | sparray:
770 return self._build_matrix("input")
772 @property
773 def temperature_vector_symbol(self) -> list:
774 if self._temperature_vector_symbol is None:
775 result = [node.temperature_symbol for node in self.nodes]
776 self._temperature_vector_symbol = result
777 return self._temperature_vector_symbol
779 @property
780 def temperature_vector(self) -> np.ndarray:
781 result = [node.temperature for node in self.nodes]
782 return np.array(result).flatten()
784 @property
785 def input_vector(self) -> np.ndarray:
786 result = [n for input_item in self.inputs for n in input_item.values]
787 return np.array(result).flatten()
789 @property
790 def input_vector_symbol(self) -> np.ndarray:
791 if self._input_vector_symbol is None:
792 result = []
793 if self.inputs is not None and self.inputs != []:
794 result = [n for input_item in self.inputs for n in input_item.symbols]
795 self._input_vector_symbol = np.array(result).flatten()
796 return self._input_vector_symbol
798 # @property
799 # def seconds_of_day(self):
800 # """
801 # Returns the seconds that have already passed on the day of ``self.start_time``.
802 #
803 # Returns
804 # -------
805 # float | int :
806 # The passed seconds of the day.
807 # """
808 # return (self.settings.start_date - self.settings.start_date.astype("datetime64[D]")) / np.timedelta64(1, "s")
810 @property
811 def save_folder_path(self):
812 if self.settings.save_folder_path is None:
813 return run_folder
814 return self.settings.save_folder_path
816 @property
817 def pickle_path(self) -> str:
818 filename = f"{self.hash}.pickle"
819 return os.path.normpath(os.path.join(self.save_folder_path, filename))
821 def pickle_path_result(self, t_span, name_add_on=None):
822 name_add_on = add_leading_underscore(name_add_on)
823 filename = f"{self.hash}{name_add_on}_{int(t_span[1])}_result.pickle"
824 return os.path.normpath(os.path.join(self.save_folder_path, filename))
826 @property
827 def pickle_path_single_solution(self) -> str:
828 filename = f"{self.hash}_single_solution.pickle"
829 return os.path.normpath(os.path.join(self.save_folder_path, filename))
831 def pickle_path_solution(self, t_span, name_add_on=None) -> str:
832 name_add_on = add_leading_underscore(name_add_on)
833 filename = f"{self.hash}{name_add_on}_{int(t_span[0])}_{int(t_span[1])}_solution.pickle"
834 return os.path.normpath(os.path.join(self.save_folder_path, filename))
836 def save_matrices(self):
837 if self.save_to_pickle:
838 file_path = self.pickle_path
839 matrices = [
840 self._system_matrix_symbol,
841 self._input_matrix_symbol,
842 self._input_vector_symbol,
843 self._temperature_vector_symbol,
844 self.forbidden_values,
845 ]
846 with open(file_path, "wb") as f:
847 pickle.dump(matrices, f)
849 def load_matrices(self):
850 file_path = self.pickle_path
851 if os.path.exists(file_path):
852 with open(file_path, "rb") as f:
853 matrices = pickle.load(f)
854 self._system_matrix_symbol = matrices[0]
855 self._input_matrix_symbol = matrices[1]
856 self._input_vector_symbol = matrices[2]
857 self._temperature_vector_symbol = matrices[3]
858 self.forbidden_values = matrices[4]
859 print("matrices loaded")
860 return True
861 else:
862 print("No network found nor loaded.")
863 return False
865 def save_last_step_solution(self, pickle_path_single_solution: str = None):
866 """
867 Saves the last time step solution.
868 """
869 if pickle_path_single_solution is None:
870 pickle_path_single_solution = self.pickle_path_single_solution
871 file_path = pickle_path_single_solution
872 self.rc_solution.save_last_step(file_path)
874 def load_initial_values(self, return_bool: bool = False, pickle_path_single_solution: str = None) -> bool:
875 """
876 Loads the last time step solution (temperature vector) and sets it as initial values.
877 """
878 if pickle_path_single_solution is None:
879 pickle_path_single_solution = self.pickle_path_single_solution
880 file_path = pickle_path_single_solution
881 if os.path.exists(file_path):
882 with open(file_path, "rb") as f:
883 solution_tuple = pickle.load(f)
884 temp_vector = solution_tuple[0]
885 input_vector = solution_tuple[1]
886 for node, temp in zip(self.rc_objects.nodes, temp_vector):
887 node.initial_temperature = temp
888 for item, value in zip(self.inputs, input_vector):
889 item.initial_value = value
890 if return_bool:
891 return True
892 else:
893 print("No temperature vector found nor loaded.")
894 if return_bool:
895 return False
897 @property
898 def inhomogeneous_system(self) -> bool:
899 """
900 Returns True if inputs are existing and the network is inhomogeneous. False otherwise.
902 Returns
903 -------
904 bool :
905 """
906 if self.rc_objects.inputs is None or self.rc_objects.inputs == []:
907 return False
908 return True
910 def solve_network(
911 self,
912 t_span: tuple | Any = None,
913 print_progress=False,
914 name_add_on: str = "",
915 check_courant: bool = True,
916 time_dependent_function: Callable = None,
917 hook_function: Callable = None,
918 expected_solution_size_mb=5000,
919 **kwargs: dict[str : int | Any],
920 ):
921 """
922 Solves the network at the given times.
924 If time_vector is None, one second is calculated.
926 Parameters
927 ----------
928 t_span : tuple | Any
929 Interval of integration (t0, tf). The solver starts with t=t0 and integrates until it reaches t=tf.
930 Both t0 and tf must be floats or values interpretable by the float conversion function.
931 print_progress : bool, optional
932 If True, some print outs are made during solving to get the time step that is currently simulated.
933 name_add_on : str, optional
934 Optional add-on to the name of in-between saves that is placed after the hash value (separated by "_").
935 Example save name:
936 42395d3d9f07f06ce9c9cb609b406aa54fc2dc5e8c4d9cc75987d9a02541ad2f_nameAddOn_zw_000001080_h.pickle
937 check_courant : bool, optional
938 If True, self.check_courant(0.4) is run before simulating.
939 time_dependent_function : Callable
940 A function returning the values for all time dependent symbols that are in the matrices in the same order as
941 self.get_time_dependent_symbols().
942 This function is required if time dependent symbols exist.
943 It must return an iterable (e.g. list)
944 hook_function : Callable, optional
945 A function that is run before the network is solved using solve_ivp.
946 Can be used to catch the time before the solving actually starts (e.g. used in `PerformanceTest`\).
947 expected_solution_size_mb : int | float, optional
948 The expected solution size in Megabytes. Is used for RAM-Management in ``SystemHandler.solve()``\.
949 The solution size depends on the size of the RC network and number of saved time steps.
950 kwargs : dict[str: int | Any], optional
951 Passed to SystemHandler.
953 Returns
954 -------
955 scipy.integrate._ivp.ivp.OdeResult :
956 The solution of the network (time and temperatures of all nodes).
957 See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.solve_ivp.html#solve-ivp
958 """
959 if self.get_time_dependent_symbols():
960 assert time_dependent_function
962 continued_simulation = False
963 if self.load_solution or self.continue_canceled:
964 success = self.rc_solution.load_solution(
965 self.pickle_path_solution(t_span, name_add_on), last_time_step=t_span[-1]
966 )
967 if not success:
968 success = self.rc_solution.load_solution(
969 self.pickle_path_result(t_span, name_add_on), last_time_step=t_span[-1]
970 )
971 if success or (not isinstance(success, bool) and success == 0): # ==0 because 0 could be the last time
972 # step (will never be the case.... -.-)
973 if not isinstance(success, bool):
974 if not self.continue_canceled:
975 self.rc_solution.delete_solution_except_first()
976 print(f"Partial solution detected but not used because continue_canceled is False.")
977 else:
978 print(
979 f"Simulation is started at loaded time step {success} on {success / t_span[-1] * 100:.2f} %."
980 )
981 t_span = (success, t_span[-1])
982 continued_simulation = True
983 else:
984 print("Solution was loaded from file.")
985 return None
987 max_step = 1
988 if check_courant and (self.rc_objects.mass_flow_nodes is not None or self.rc_objects.mass_flow_nodes != []):
989 loop_counter = 0
990 while loop_counter < 1000:
991 try:
992 self.check_courant(max_step) # approximated maximum time step used during iterations.
993 # if no error
994 break
995 except HighCourantNumberError:
996 loop_counter += 1
997 max_step *= 0.99
999 temperature_vector = np.array(self.temperature_vector).flatten()
1000 system_matrix = self.system_matrix
1001 time_symbols = self.get_time_dependent_symbols()
1002 if time_symbols:
1003 system_matrix = SparseSymbolicEvaluator(system_matrix, time_symbols)
1005 system_handler_kwargs = {
1006 "system_matrix": system_matrix,
1007 "rc_solution": self.rc_solution,
1008 "settings": self.settings,
1009 "print_progress": print_progress,
1010 "print_points": np.linspace(t_span[0], t_span[-1], 20+1),
1011 "batch_end": t_span[-1],
1012 "time_dependent_function": time_dependent_function,
1013 }
1015 # create the time_steps to solve the results (every X seconds a value)
1016 t_eval = np.linspace(
1017 t_span[0], t_span[1], max(1, int((t_span[1] - t_span[0]) / self.settings.save_all_x_seconds) + 1)
1018 )
1020 if self.inhomogeneous_system:
1021 input_matrix = self.input_matrix
1022 if time_symbols:
1023 input_matrix = SparseSymbolicEvaluator(input_matrix, time_symbols)
1024 input_vector = self.input_vector
1026 # Each Input can be called as a function to calculate new boundary values during simulation:
1027 functions_list = self.rc_objects.inputs
1028 # Also each Input has some functions that can be called independently of the object that return values for
1029 # the calculation.
1030 kwargs_functions = {}
1031 for item in self.rc_objects.inputs:
1032 kwargs_functions.update(item.get_kwargs_functions()) # Values should be overwritten so that the same
1033 # values are calculated only once
1035 system_handler_kwargs.update(
1036 {
1037 "input_matrix": input_matrix,
1038 "input_vector": input_vector,
1039 "functions_list": functions_list,
1040 "kwargs_functions": kwargs_functions,
1041 "t_eval": t_eval,
1042 "first_time": t_span[0],
1043 }
1044 )
1046 # only update the keys that are in the predefined dictionary (it would also work if everything is passed anyway)
1047 system_handler_kwargs.update({k: kwargs[k] for k in system_handler_kwargs if k in kwargs})
1049 system_handler = self.system_handler_type(**system_handler_kwargs)
1050 save_prefix = f"{self.hash}"
1051 if name_add_on:
1052 save_prefix += f"_{name_add_on.removeprefix('_').removesuffix('_')}"
1053 solve_handler = SolveIVPHandler(
1054 system_handler=system_handler,
1055 save_prefix=save_prefix,
1056 solve_settings=self.settings.solve_settings,
1057 )
1058 if hook_function is not None:
1059 hook_function()
1060 solve_handler.solve(
1061 t_span=t_span,
1062 y0=temperature_vector,
1063 t_eval=t_eval,
1064 continued_simulation=continued_simulation,
1065 expected_solution_size_mb=expected_solution_size_mb,
1066 )
1068 @property
1069 def system_handler_type(self):
1070 """
1071 Returns the SystemHandler type depending on which system (in/homogeneous) is needed.
1073 Returns
1074 -------
1075 type :
1076 The system handler type.
1077 """
1078 if self.inhomogeneous_system:
1079 return InhomogeneousSystemHandler
1080 return HomogeneousSystemHandler
1082 @property
1083 def all_objects(self) -> list:
1084 return [*(self.rc_objects.resistors or []), *(self.rc_objects.nodes or []), *(self.rc_objects.inputs or [])]
1086 @property
1087 def objects_with_identical_symbols(self) -> list:
1088 """
1089 Get all objects with symbols that are relevant for the system matrix.
1091 You cannot just use every object, because some (e.g. Resistors) can be summarized in local groups with one
1092 symbol representing the whole group, like the equivalent resistance of multiple connected Resistors.
1094 No ``InternalHeatSource`` s are returned, because their symbol is already represented in the corresponding Node.
1096 Returns
1097 -------
1098 list :
1099 All network objects without the ones that are already taken into account by the symbols of another.
1100 The list has the same order every time it is created.
1101 """
1103 def filter_resistors(resistor_list):
1104 # Create set of all resistors for O(1) lookups
1105 all_resistors = {obj for obj in resistor_list if isinstance(obj, Resistor)}
1106 to_exclude = set()
1108 # Collect all resistors to exclude in order of appearance
1109 for obj in resistor_list:
1110 if isinstance(obj, Resistor) and obj not in to_exclude:
1111 inbetween = set(obj.all_resistors_inbetween) - {obj}
1112 to_exclude.update(inbetween & all_resistors)
1114 # Filter maintaining original order
1115 return [obj for obj in resistor_list if not obj in to_exclude]
1117 result = [rc_obj for rc_obj in self.all_objects if not isinstance(rc_obj, InternalHeatSource)]
1118 return filter_resistors(result)
1120 def get_symbols(self):
1121 """
1122 Returns a list with all symbols, except time dependent symbols.
1124 Returns
1125 -------
1126 list :
1127 All symbols, except time dependent symbols.
1128 """
1129 all_symbols = [symb for rc_object in self.objects_with_identical_symbols for symb in rc_object.symbols]
1130 return all_symbols
1132 def get_values(self):
1133 """
1134 Returns a list with all values, except of time dependent symbols.
1136 Returns
1137 -------
1138 list :
1139 All values, except of time dependent symbols.
1140 """
1141 all_values = [
1142 val if isinstance(val, Basic) else np.float64(val)
1143 for rc_object in self.objects_with_identical_symbols
1144 for val in rc_object.values
1145 ]
1146 return all_values
1148 def get_time_dependent_symbols(self) -> list:
1149 """
1150 A list with all time dependent symbols but each of it is only added once.
1152 Returns
1153 -------
1154 list :
1155 The time dependent symbols.
1156 """
1157 symbols = []
1158 seen = set()
1159 for rc_object in self.objects_with_identical_symbols:
1160 for symb in rc_object.time_dependent_symbols:
1161 if symb not in seen:
1162 seen.add(symb)
1163 symbols.append(symb)
1164 return symbols
1166 def get_symbols_and_values(self) -> tuple[list, list]:
1167 """
1168 Return two lists of first: all symbols and second: all associated values of all rc objects.
1170 Returns
1171 -------
1172 tuple[list, list] :
1174 """
1175 return self.get_symbols(), self.get_values()
1177 def make_system_matrices(self):
1178 """
1179 Creates the Jacobian matrices of the RC Network.
1181 If self.load_from_pickle the whole network will be loaded from pickle file if it exists to prevent heavy
1182 calculations.
1184 Returns
1185 -------
1186 sympy expression :
1187 The system matrix as sympy expression.
1188 """
1189 success = False
1190 if self.load_from_pickle:
1191 try:
1192 success = self.load_matrices()
1193 except Exception:
1194 pass
1195 if not success:
1196 # fill system matrix
1197 terms = []
1198 involved_symbols: list[set] = []
1199 temperature_symbols: list[set] = []
1200 variables = []
1201 for node in self.nodes:
1202 # works only for one term
1203 term, t_symbols, all_symbols = node.temperature_derivative_term()
1204 terms.append(term)
1205 involved_symbols.append(all_symbols)
1206 temperature_symbols.append(t_symbols)
1207 variables.append(node.temperature_symbol)
1209 self._system_matrix_symbol = build_jacobian(
1210 terms, variables, temperature_symbols, num_cores=self.num_cores_jacobian
1211 )
1212 print("system matrix created")
1214 self.save_matrices()
1216 input_variables = list(self.input_vector_symbol)
1217 self._input_matrix_symbol = build_jacobian(
1218 terms, input_variables, involved_symbols, num_cores=self.num_cores_jacobian
1219 )
1220 print("input matrix created")
1222 self.save_matrices()
1224 def check_courant(self, time_step: float):
1225 """
1226 Calculates the Courant number for the whole network and raises an error if its larger than 1.
1228 Parameters
1229 ----------
1230 time_step : float
1231 The maximum time step in seconds.
1233 Raises
1234 ------
1235 HighCourantNumberError :
1236 If the courant number is greater than 1 the network will calculate shit.
1238 """
1239 # TODO: Do not use the courant number but return a critical maximum time_step to easy raise an error during
1240 # iterations. So: no iteration, but calculate the maximum time_step and return this so that it can be set.
1241 mass_flow_nodes = self.rc_objects.mass_flow_nodes
1242 mass_flow_node: MassFlowNode
1243 for mass_flow_node in mass_flow_nodes:
1244 if mass_flow_node.courant_number(time_step) > 1:
1245 raise HighCourantNumberError
1247 def matrix_to_latex_diag(self):
1248 matrix = self.system_matrix_symbol
1249 diag_elements = matrix.diagonal()
1250 matrix_no_diag = matrix - diag(*diag_elements)
1252 matrix_latex = latex(matrix_no_diag)
1253 diag_latex = latex(Matrix(diag_elements).reshape(len(diag_elements), 1))
1255 return f"{matrix_latex} + \\mathrm{{diag}}\\left({diag_latex}\\right)"