Coverage for pyrc \ core \ components \ templates.py: 64%
684 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:20 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:20 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8from __future__ import annotations
10import os
11import pickle
12import re
13from abc import abstractmethod, ABC
14from typing import Any, TYPE_CHECKING, Optional
16import numpy as np
17import pandas as pd
18import sympy as sp
20from vpython import box, vector
22from pyrc.core.components.input import Input
23from pyrc.core.visualization.color.color import value_to_rgb
24from pyrc.tools.errors import FixedPositionError, FixedXYError, FixedZError
25from pyrc.tools.functions import is_set
26from pyrc.tools.science import is_numeric
27from pyrc.visualization.vtk_parser import write_static_cells_vtu, write_temperature_vtu
29if TYPE_CHECKING:
30 from pyrc.core.inputs import InternalHeatSource
31 from pyrc.core.resistors import MassTransport
32 from pyrc.core.components.resistor import Resistor
33 from pyrc.core.components.node import TemperatureNode
36class RCObjects:
37 def __init__(self, nodes: list = None, resistors: list = None, boundaries: list = None):
38 self.__nodes = nodes
39 self.__resistors: list[Resistor] = resistors
40 self.__boundaries = boundaries
41 self.__input_objs = None
42 self.__internal_heat_sources = None
44 @property
45 def nodes(self) -> list:
46 return self.__nodes
48 @property
49 def mass_flow_nodes(self):
50 from pyrc.core.nodes import MassFlowNode
52 return [n for n in self.nodes if isinstance(n, MassFlowNode)]
54 @property
55 def resistors(self) -> list[Resistor]:
56 return self.__resistors
58 @property
59 def boundaries(self) -> list:
60 return self.__boundaries
62 @property
63 def inputs(self) -> list[EquationItemInput]:
64 if self.__input_objs is None and self.nodes is not None:
65 result = []
66 if self.boundaries is not None:
67 result.extend(self.boundaries)
68 if self.internal_heat_sources is not None:
69 result.extend(self.internal_heat_sources)
70 self.__input_objs = result
71 return self.__input_objs
73 @property
74 def all(self) -> list[TemperatureNode | Resistor]:
75 """
76 Returns all `TemperatureNode`\\s and `Resistor`\\s (network objects, that are linked together).
78 `InternalHeatSource` s are not returned!
80 Returns
81 -------
82 list[TemperatureNode | Resistor] :
83 Capacitors, BoundaryConditions and Resistors
84 """
85 result = []
86 item: list | None
87 for item in [self.__nodes, self.__resistors, self.__boundaries]:
88 if item is not None and item != []:
89 result.extend(item)
90 return result
92 @property
93 def all_equation_objects(self) -> list[EquationItem]:
94 return [*self.all, *self.other_equation_objects]
96 @property
97 def other_equation_objects(self) -> list[EquationItem]:
98 """
99 Like `RCObjects.all` but not the connected RC objects were returned but all other `EquationItem` s.
101 This is mainly used for `InternalHeatSource` s and in the future for other input items.
103 Returns
104 -------
105 list[InternalHeatSource] :
106 All other `EquationItem` s.
107 Until now, it's just all InternalHeatSource items.
108 """
109 return self.internal_heat_sources
111 @property
112 def internal_heat_sources(self) -> list[InternalHeatSource]:
113 if self.__internal_heat_sources is None:
114 self.__internal_heat_sources = [n.internal_heat_source for n in self.nodes if n.internal_heat_source]
115 return self.__internal_heat_sources
117 def set_lists(self, capacitors: list = None, resistors: list = None, boundaries: list = None):
118 if capacitors is not None:
119 assert isinstance(capacitors, list)
120 self.__nodes = capacitors
121 if resistors is not None:
122 assert isinstance(resistors, list)
123 self.__resistors = resistors
124 if boundaries is not None:
125 assert isinstance(boundaries, list)
126 self.__boundaries = boundaries
128 @property
129 def raw_data(self) -> tuple:
130 return (self.__nodes, self.__resistors, self.__boundaries, self.__input_objs)
132 def wipe_all(self):
133 """
134 Deletes every raw data.
135 """
136 self.__nodes = None
137 self.__resistors = None
138 self.__boundaries = None
139 self.__input_objs = None
140 self.__internal_heat_sources = None
142 def get_all_objects(self, variant: type) -> list:
143 """
144 Returns a list with tuples with all objects in the network with requested variant/type.
146 Parameters
147 ----------
148 variant : type
149 The ``type`` of the objects that will be returned.
150 Will be used for the ``isinstance()`` match.
152 Returns
153 -------
154 list :
155 The list with all objects in the network with requested variant.
156 """
157 result = []
159 for element in self.all:
160 if isinstance(element, variant):
161 result.append(element)
162 return result
164 def set_loaded_data(self, loaded_objects: RCObjects):
165 """
166 Replaces all attributes with the ones of the loaded `RCObjects`.
168 Parameters
169 ----------
170 loaded_objects : RCObjects
171 The loaded `RCObjects` that should "replace" self / should be used to overwrite self.
173 """
174 raw_data = loaded_objects.raw_data
175 self.__nodes = raw_data[0]
176 self.__resistors = raw_data[1]
177 self.__boundaries = raw_data[2]
178 self.__input_objs = raw_data[3]
181initial_rc_objects = RCObjects()
184class SymbolMixin:
185 @property
186 @abstractmethod
187 def symbols(self) -> list:
188 """
189 Returns a list of all sympy.symbols of the object, except time dependent symbols.
191 Must be in the same order as self.values.
193 Returns
194 -------
195 list :
196 The list of sympy.symbols.
197 """
198 pass
200 @property
201 @abstractmethod
202 def values(self) -> list:
203 """
204 Returns a list of all values of all object symbols, except of time dependent symbols.
206 Must be in the same order as self.symbols.
208 Returns
209 -------
210 list :
211 The list of sympy.symbols.
212 """
213 pass
215 @property
216 def time_dependent_symbols(self) -> list:
217 """
218 Returns a list of all symbols that are needed to calculate self.value.
220 The list is sorted by the name of the symbols for deterministic reasons.
222 Returns
223 -------
224 list :
225 All symbols that are needed to calculate self.value (symbols that are time dependent and will be calculated
226 between time steps).
227 """
228 symbols = {sym for value in self.values if isinstance(value, sp.Expr) for sym in value.free_symbols}
229 return sorted(symbols, key=lambda s: s.name)
232class EquationItem(SymbolMixin, ABC):
233 item_counter = 0
235 def __init__(self):
236 EquationItem.item_counter += 1
237 self.id: int = EquationItem.item_counter
238 self._index = None
240 @classmethod
241 def reset_counter(cls):
242 EquationItem.item_counter = 0
245class RCSolution:
246 def __init__(self, rc_objects: RCObjects = initial_rc_objects):
247 self.rc_objects = rc_objects
248 self.__result_vectors: np.ndarray | Any = None
249 self._input_vectors: list = []
250 self.time_steps: np.ndarray | Any = None
252 self._temperature_dataframe = None
253 self._dataframe = None
255 self.last_saved_timestep_index = 0
257 @property
258 def input_exists(self) -> bool:
259 if self._input_vectors:
260 return True
261 return False
263 @property
264 def inputs(self):
265 return self.rc_objects.inputs
267 @property
268 def nodes(self):
269 return self.rc_objects.nodes
271 @property
272 def input_vectors(self) -> np.ndarray | Any:
273 if self._input_vectors:
274 return np.concatenate(self._input_vectors, axis=0)
275 return None
277 def last_value_input(self, index):
278 return self._input_vectors[-1][-1, index]
280 def append_to_input(self, new_input_vector: np.ndarray):
281 self._input_vectors.append(new_input_vector.reshape(1, -1))
283 def delete_last_input(self):
284 if len(self._input_vectors) > 0:
285 self._input_vectors.pop()
287 def delete_solution_except_first(self):
288 """
289 Deletes every solution except for the time_step == 0.
290 """
291 self.__result_vectors = self.__result_vectors[0, :]
292 first_input_vector = self._input_vectors[0]
293 self._input_vectors = [first_input_vector]
294 self.time_steps = np.array(self.time_steps[0])
296 self._temperature_dataframe = None
297 self._dataframe = None
299 self.last_saved_timestep_index = 0
301 def save_solution(self, path_with_name_and_ending: str):
302 with open(path_with_name_and_ending, "wb") as f:
303 pickle.dump(self.raw_data, f)
304 if self.time_steps is not None:
305 self.last_saved_timestep_index = len(self.time_steps) - 1
307 def save_to_file_only(self, t: np.ndarray, y: np.ndarray, path_with_name_and_ending: str):
308 """
309 Only save the passed solution to file and delete the input vector except the last value.
311 The t and y values are not saved into the solution object to prevent high RAM usage. The input vector is
312 deleted for the same reason. Only the last value of the input vector is kept to use it for further solving.
314 Parameters
315 ----------
316 t : np.ndarray
317 The time step array of the solution.
318 y : np.ndarray
319 The result array/matrix of the solution.
320 path_with_name_and_ending : str
321 Where to save the solution.
322 """
323 if t is not None:
324 data = [y, self._input_vectors[-len(t) :], t, None, None]
325 with open(path_with_name_and_ending, "wb") as f:
326 pickle.dump(data, f)
327 # delete the input vector to make space for new one
328 self._input_vectors = []
330 def save_new_solution(self, path_with_name_and_ending: str):
331 """
332 Like save_solution, but it only saves the new solution data instead of everything.
334 New data is defined as all data that is saved in a time step greater than self.last_saved_timestep.
336 Parameters
337 ----------
338 path_with_name_and_ending : str
339 Where to save the solution.
341 Returns
342 -------
344 """
345 if self.time_steps is None:
346 self.save_solution(path_with_name_and_ending)
347 else:
348 # save everything from self.last_saved_timestep_index up to the last time step
349 with open(path_with_name_and_ending, "wb") as f:
350 pickle.dump(self.raw_data_last(self.last_saved_timestep_index + 1), f)
352 if self.time_steps is not None:
353 self.last_saved_timestep_index = len(self.time_steps) - 1
355 def write_paraview_data(
356 self,
357 folder: str,
358 increment: int = None,
359 number_of_saved_steps: int = None,
360 time_increment: int | float | Any = None,
361 use_degrees_celsius: bool = True,
362 ):
363 """
364 Parsing the result data to vtu files that can be used to visualize the result in paraview.
366 It is recommended to not generate more than several thousand resolution steps.
367 If no increment is given, about 2000 result steps are created.
369 Parameters
370 ----------
371 folder : str
372 The folder to save the paraview data in.
373 increment : int, optional
374 If specified, only the incremental part of the result is parsed (to shorten the write time).
375 If None, the increment is calculated so that a maximum of 2000 results are created.
376 number_of_saved_steps : int, optional
377 Works like increment but instead of walking a fixed step width it calculates the increment using the
378 given number to get the number of steps.
379 Or: Say, how many steps should be saved (+-1).
380 Overwrites the increment parameter.
381 time_increment : int | float | Any, optional
382 The x'th time that should be saved in seconds.
383 If given, the increment parameter is not used.
384 Example usage: time_increment = 120
385 The result files are created for results every 120 seconds.
386 use_degrees_celsius : bool, optional
387 If True, the temperatures are saved as degree Celsius. In Kelvin otherwise.
389 Returns
390 -------
392 """
393 static_points, static_cells = write_static_cells_vtu(self.nodes, folder)
395 if increment is None:
396 increment = self.time_steps_count // 2000
397 if number_of_saved_steps is not None:
398 increment = self.time_steps_count // number_of_saved_steps
399 increment = max(1, int(increment))
400 if time_increment is not None:
401 assert isinstance(time_increment, float) or isinstance(time_increment, int)
402 mask = None
403 if time_increment is not None:
404 mask = np.diff(np.floor((self.time_steps - self.time_steps[0]) / time_increment), prepend=-1).astype(bool)
405 if mask is not None:
406 print("Time increment is used.")
407 result = self.result_vectors[mask, :]
408 time_steps = self.time_steps[mask].tolist()
409 else:
410 print("Manual increment is used.")
411 result = self.result_vectors[::increment, :]
412 time_steps = self.time_steps[::increment].tolist()
414 if use_degrees_celsius:
415 result -= 273.15
417 write_temperature_vtu(
418 result,
419 time_steps,
420 static_points,
421 static_cells,
422 folder,
423 step_interval=1, # reducing of the result was already done, so parse every step
424 )
426 @property
427 def raw_data(self):
428 return [self.result_vectors, self._input_vectors, self.time_steps, self._temperature_dataframe, self._dataframe]
430 def raw_data_last(self, starting_index):
431 """
432 Like raw_data but only returns the values starting from `starting_index`.
434 Parameters
435 ----------
436 starting_index : int
437 The index where to start getting the data from.
439 Returns
440 -------
441 list :
442 A list with all raw data starting from `starting_index`.
443 """
444 result_vectors = None
445 if self.result_vectors is not None:
446 result_vectors = self.result_vectors[starting_index:, :]
447 input_vectors = []
448 if self._input_vectors:
449 input_vectors = self._input_vectors[starting_index:]
450 time_steps = None
451 if self.time_steps is not None:
452 time_steps = self.time_steps[starting_index:]
453 temperature_dataframe = None
454 if self._temperature_dataframe is not None:
455 temperature_dataframe = self._temperature_dataframe.iloc[starting_index:]
456 dataframe = None
457 if self._dataframe is not None:
458 dataframe = self._dataframe.iloc[starting_index:]
460 return [result_vectors, input_vectors, time_steps, temperature_dataframe, dataframe]
462 def delete_solutions(self, confirm=False):
463 """
464 Use with care! Deletes all data if it's confirm=True.
465 """
466 if confirm:
467 self.__result_vectors = None
468 self._input_vectors = []
469 self.time_steps = None
471 self._temperature_dataframe = None
472 self._dataframe = None
474 self.last_saved_timestep_index = 0
475 else:
476 print("You have to confirm to delete all data.")
478 def load_solution(self, path_with_name_and_ending: str, save_combined_solution: bool = True, last_time_step=None):
479 """
480 Loads a pickled solution. If the file is not found it searches for incremental solutions and loads them.
482 This method forces the load. That the hash matches the current network is the responsibility of the user.
484 The search for an incremental solution is done by using the hash as the start followed by a "_" and
485 everything after the hash is used as add-on to the name, except "_result". So if the requested file is:
486 fcd7d8e0f79c611c05db6e80457b8c3f0f2a696acb5e213cc0516bed468e9497_normal_static_result.pickle
487 it searches for:
488 fcd7d8e0f79c611c05db6e80457b8c3f0f2a696acb5e213cc0516bed468e9497_normal_static_0000100_*.pickle
489 The number is the time step and after the time step everything can follow (".*").
491 Parameters
492 ----------
493 path_with_name_and_ending : str
494 The path where the solution is stored with name and ending.
495 save_combined_solution : bool, optional
496 If True, the solution is saved in a pickle file if it was concatenated from incremental solutions.
497 last_time_step : int | float, optional
498 The last time step which defines the complete solution.
499 If given, it is checked if the whole solution was loaded or just a part out of it.
500 Used, to continue canceled simulations.
502 Returns
503 -------
505 """
506 path_with_name_and_ending = os.path.normpath(path_with_name_and_ending)
507 if os.path.exists(path_with_name_and_ending):
508 with open(path_with_name_and_ending, "rb") as f:
509 loaded_solution = pickle.load(f)
510 raw_data = loaded_solution
511 self._append_or_initialize(raw_data)
512 return True
513 else:
514 # try to load incremental data
515 folder, rc_hash, name_add_on = self._get_hash_and_folder_from_file_name(path_with_name_and_ending)
516 if rc_hash is not None:
517 success = self._load_all_solutions(folder, rc_hash, name_add_on)
518 solution_is_complete = True
519 if last_time_step is not None and success:
520 last_loaded_step = self.time_steps[-1]
521 if last_loaded_step < last_time_step:
522 print(f"Solution loaded {last_loaded_step}/{last_time_step}")
523 solution_is_complete = False
524 success = last_loaded_step
525 if success and save_combined_solution and solution_is_complete:
526 self.save_solution(path_with_name_and_ending)
527 return success
528 return False
530 def _append_or_initialize(self, raw_data):
531 """
532 Append new data from raw_data to existing attributes if they are not None,
533 otherwise initialize them.
535 Parameters
536 ----------
537 raw_data : tuple
538 A tuple containing (result_vectors, input_vectors, time_steps,
539 temperature_dataframe, dataframe).
540 """
541 if raw_data[0] is not None:
542 if self.__result_vectors is None:
543 self.__result_vectors: np.ndarray = raw_data[0]
544 else:
545 self.__result_vectors = np.concatenate((self.__result_vectors, raw_data[0]), axis=0)
547 if raw_data[1] is not None:
548 if self._input_vectors is None:
549 self._input_vectors: list = raw_data[1]
550 else:
551 self._input_vectors.extend(raw_data[1])
553 if raw_data[2] is not None:
554 if self.time_steps is None:
555 self.time_steps: np.ndarray = raw_data[2]
556 else:
557 self.time_steps = np.concatenate((self.time_steps, raw_data[2]), axis=0)
559 if raw_data[3] is not None:
560 if self._temperature_dataframe is None:
561 self._temperature_dataframe = raw_data[3]
562 else:
563 self._temperature_dataframe = pd.concat([self._temperature_dataframe, raw_data[3]])
565 if raw_data[4] is not None:
566 if self._dataframe is None:
567 self._dataframe = raw_data[4]
568 else:
569 self._dataframe = pd.concat([self._dataframe, raw_data[4]])
571 def add_to_solution(self, new_t: list, new_y: list[np.ndarray]) -> None:
572 """
573 Add new solution from solver. Initialize or append/concatenate.
575 Parameters
576 ----------
577 new_t : list
578 All new time steps in a list.
579 new_y : list[np.ndarray]
580 All new result vectors (temperature vectors) in a list.
581 """
582 if self.t is None:
583 self.t = np.concatenate(new_t)
584 else:
585 if new_t is not None:
586 self.t = np.concatenate([self.t, *new_t])
587 if self.y is None:
588 self.y = np.concatenate(new_y, axis=1).T
589 else:
590 if new_y is not None:
591 self.y = np.concatenate([self.y.T, *new_y], axis=1).T
593 def _load_all_solutions(self, save_dir: str, save_prefix: str, hash_add_on: str | Any = None):
594 """
595 Load all incrementally saved pickled solution files matching the pattern
596 '{save_prefix}.*{float(batch_end):09.0f}.*.(pickle|pkl)' in ascending order of batch_end.
598 The current solution is replaced if some exist!
600 Parameters
601 ----------
602 save_dir : str
603 Directory where the pickled files are stored.
604 save_prefix : str
605 Prefix of the saved files to identify relevant pickles.
606 hash_add_on : str | Any, optional
607 A string that is added to the hash with a "_" to serve as identifier.
609 Returns
610 -------
611 list
612 A list of loaded solutions sorted by batch_end.
613 """
614 if hash_add_on is not None:
615 save_prefix += "_" + hash_add_on
616 # pattern = re.compile(rf"{re.escape(save_prefix)}.*?(\d+(?:\.\d+)?)(?!.*\d).*\.(?:pickle|pkl)$")
617 pattern = re.compile(
618 rf"{re.escape(save_prefix)}_?(\d+(?:\.\d+)?)(?=_(?:s)?\.pickle$|(?:s)?\.pickle$|_(?:s)?\.pkl$|(?:s)?\.pkl$)"
619 )
620 solutions = []
622 for file_name in os.listdir(save_dir):
623 match = pattern.match(file_name)
624 if match:
625 batch_end = float(match.group(1))
626 solutions.append((batch_end, file_name))
628 solutions.sort(key=lambda x: x[0])
630 all_data = [[] for _ in range(5)]
632 for _, file_name in solutions:
633 with open(os.path.join(save_dir, file_name), "rb") as f:
634 raw_data = pickle.load(f)
635 for i, data in enumerate(raw_data):
636 if data is not None:
637 all_data[i].append(data)
639 # Concatenate once for each data type
640 if all_data[0]:
641 self.__result_vectors = np.concatenate(all_data[0], axis=0)
643 if all_data[1]:
644 self._input_vectors = []
645 for vectors in all_data[1]:
646 self._input_vectors.extend(vectors)
648 if all_data[2]:
649 self.time_steps = np.concatenate(all_data[2], axis=0)
651 if all_data[3]:
652 self._temperature_dataframe = pd.concat(all_data[3], ignore_index=True)
654 if all_data[4]:
655 self._dataframe = pd.concat(all_data[4], ignore_index=True)
656 #
657 # for _, file_name in solutions:
658 # with open(os.path.join(save_dir, file_name), "rb") as f:
659 # raw_data = pickle.load(f)
660 # self._append_or_initialize(raw_data)
662 if len(solutions) > 0:
663 return True
664 return False
666 @staticmethod
667 def _get_hash_and_folder_from_file_name(path: str):
668 """
669 Extract the hash and folder from the full path of a saved pickle file.
671 The hash is defined as all characters in the filename before the first underscore.
673 The name add-on is everything followed by the hash (without the underscore) but without the .pickle extension
674 and without "_result". If no characters match this, None is returned.
676 Parameters
677 ----------
678 path : str
679 Full path to the saved pickle file including the file name (with or without ending)
681 Returns
682 -------
683 tuple[str, str]
684 A tuple (save_dir, save_prefix).
685 """
686 folder = os.path.dirname(path)
687 filename = os.path.basename(path)
688 # split at _
689 split = filename.split("_", 1)
690 rc_hash = None
691 name_add_on = None
692 if len(split) > 1:
693 rc_hash = split[0]
694 name_add_on = split[1]
695 else:
696 # split at .
697 split = filename.split(".", 1)
698 if len(split) > 1:
699 if len(split[0]) == 64:
700 rc_hash = split[0]
701 name_add_on = ".".join(split[1:]) if len(split) > 1 else None
702 if name_add_on is not None:
703 name_add_on = name_add_on.removesuffix(".pickle")
704 name_add_on = name_add_on.removesuffix(".pkl")
705 name_add_on = name_add_on.removesuffix("_result")
706 return folder, rc_hash, name_add_on
708 @property
709 def exist(self) -> bool:
710 if self.result_vectors is not None:
711 return True
712 return False
714 @property
715 def result_vectors(self) -> np.ndarray | Any:
716 return self.__result_vectors
718 @result_vectors.setter
719 def result_vectors(self, value):
720 pass
722 @property
723 def time_steps_count(self):
724 return len(self.time_steps.flatten())
726 @property
727 def temperature_vectors_pandas(self) -> pd.DataFrame:
728 """
729 Returns the solution with all node results in one column within a pd.DataFrame.
731 The DataFrame is cached. To reset it, set ``self.result_vectors = None`` .
733 Returns
734 -------
735 pd.DataFrame :
736 The solution. Each column represents the solution of one node. Each row the time step.
737 The index of the DataFrame are the time steps.
738 """
739 if self._temperature_dataframe is None:
740 self._temperature_dataframe = pd.DataFrame(self.result_vectors)
741 self._temperature_dataframe.index = self.time_steps
742 return self._temperature_dataframe
744 @property
745 def dataframe(self):
746 if self._dataframe is None:
747 merge = np.concatenate((self.result_vectors, self.input_vectors), axis=1)
748 self._dataframe = pd.DataFrame(
749 merge, columns=[*[n.id for n in self.nodes], *[i.id for i in self.inputs]], index=self.time_steps
750 )
751 return self._dataframe
753 @property
754 def temperature_vectors(self) -> np.ndarray:
755 """
756 Like temperature_vectors_pandas, but returns a numpy array.
758 This value is not cached.
760 Returns
761 -------
762 np.ndarray :
763 The solution. Each column represents the solution of one node. Each row the time step.
764 """
765 return self.result_vectors
767 @property
768 def t(self):
769 return self.time_steps
771 @t.setter
772 def t(self, value):
773 self.time_steps = value
775 @property
776 def y(self):
777 return self.result_vectors
779 @y.setter
780 def y(self, value):
781 assert isinstance(value, np.ndarray)
782 self.__result_vectors = value
784 # def set_loaded_data(self, loaded_solutions: RCSolution):
785 # """
786 # Replaces all attributes with the ones of the loaded `RCSolution`.
787 #
788 # This is used when the object can hardly be replaced by a loaded one, e.g. when used in composition.
789 #
790 # Parameters
791 # ----------
792 # loaded_solutions : RCSolution
793 # The loaded solution object that should "replace" self / should be used to overwrite self.
794 #
795 # """
796 # self.time_steps = loaded_solutions.time_steps
797 # self.result_vectors = loaded_solutions.result_vectors
798 # self._input_vectors: np.ndarray | Any = loaded_solutions._input_vectors
800 def save_last_step(self, file_path):
801 """
802 Saves a vector with the solution of the last time step.
804 The saved data can be set as initial values using `RCNetwork.load_initial_values`
806 Parameters
807 ----------
808 file_path : str | Any
809 The file path with name and ending.
810 """
811 last_solution = self.temperature_vectors[-1, :]
812 last_input = self.input_vectors[-1, :]
813 with open(file_path, "wb") as f:
814 pickle.dump((last_solution, last_input), f)
817solution_object = RCSolution()
820class ObjectWithPorts:
821 def __init__(self):
822 self.__neighbours = []
824 @property
825 def neighbours(self):
826 return self.__neighbours
828 @property
829 def ports(self):
830 """
831 Alias for `self.neighbours`.
832 """
833 return self.__neighbours
835 def __iter__(self):
836 """
837 Iterate over `self.neighbours`.
839 Returns
840 -------
841 ObjectWithPorts :
842 The neighbours.
843 """
844 for neighbour in self.neighbours:
845 yield neighbour
847 def connect(self, neighbour, direction: tuple | list | np.ndarray | Any = None, node_direction_points_to=None):
848 """
849 Add the given object/neighbour to the `self.neighbours` list.
851 The neighbour itself will connect ``self`` to its neighbours list.
852 E.g.: If node2 should be connected to node1, node2's neighbours list appends self.
854 The direction is a possibility to set the direction between two connected nodes manually. It is used for
855 connected `BoundaryCondition` s and `Node` s.
856 The direction is set for the neighbour. The
858 Parameters
859 ----------
860 neighbour : ObjectWithPorts
861 The neighbour to connect to. It will connect ``self`` to itself.
862 This is the Node the manual direction is set on!
863 direction : tuple | list | np.ndarray, optional
864 If not None, a direction is set manually to node_direction_points_at.
865 Either none or both node_direction_points_at and direction must be passed.
866 node_direction_points_to : TemperatureNode, optional
867 If not None, this is the node to which the direction points at (looking from neighbour).
868 Either none or both node_direction_points_at and direction must be passed.
869 Must be a TemperatureNode.
870 """
871 self.__neighbours.append(neighbour)
872 neighbour.__single_connect(self)
873 if (direction is not None) ^ (node_direction_points_to is not None):
874 raise ValueError("Either none or both node_direction_points_at and direction must be passed.")
876 # set direction of neighbour using the node the direction points at
877 if direction is not None:
878 direction: tuple | np.ndarray | list
879 from pyrc.core.nodes import Node
880 from pyrc.core.components.node import TemperatureNode
882 assert isinstance(node_direction_points_to, TemperatureNode)
883 direction: np.ndarray = np.array(direction) / np.linalg.norm(np.array(direction))
884 if not isinstance(neighbour, Node):
885 assert isinstance(self, Node), "The direction can only set on Nodes, not TemperatureNodes/Resistors."
886 # set direction at self to node_direction_points_to
887 self.set_direction(node_direction_points_to, direction)
888 else:
889 node: TemperatureNode = node_direction_points_to
890 neighbour.set_direction(node, direction)
892 def double_connect(self, neighbour1, neighbour2):
893 self.connect(neighbour1)
894 self.connect(neighbour2)
896 def __single_connect(self, neighbour):
897 """
898 Like `self.connect`, but it doesn't set the connection to the neighbour, too.
900 Parameters
901 ----------
902 neighbour : ObjectWithPorts
903 The neighbour to connect to.
904 """
905 self.__neighbours.append(neighbour)
908class ConnectedFlowObject:
909 def __init__(self):
910 self._volume_flow = None
911 # manual switch to determine if the volume flows are balanced: sum(inflows)-sum(outflows) = 0
912 # This switch should be actuated from the algorithm that distributes the flows or a method that checks the
913 # balance.
914 self.volume_flow_is_balanced = False
916 @property
917 def guess_volume_flow(self):
918 return self._volume_flow
920 @property
921 def volume_flow(self):
922 return self._volume_flow
924 @abstractmethod
925 def check_balance(self) -> bool:
926 pass
928 @property
929 def sources(self) -> list:
930 return []
932 @property
933 def sinks(self) -> list:
934 return []
936 @property
937 @abstractmethod
938 def balance(self):
939 pass
942class Geometric:
943 """
944 Skeleton for a geometric object that only contains the position in 3D space.
946 Defines getter and setter for X, Y and Z coordinates. If a 2D vector is given,
947 the Z coordinate is set to 0.
949 Parameters
950 ----------
951 position : np.ndarray
952 Either 2D or 3D position of the object as array.
953 fixed_position : bool
954 If ``True``, the position cannot be changed. Overwrites both `fixed_z` and `fixed_xy` parameters.
955 fixed_z : bool
956 If ``True``, the z coordinate cannot be changed.
957 fixed_xy : bool
958 If ``True``, the x and y coordinates cannot be changed.
959 """
961 def __init__(
962 self, position: np.ndarray | tuple, fixed_position: bool = False, fixed_z: bool = False, fixed_xy: bool = False
963 ):
965 if fixed_position:
966 fixed_z = fixed_xy = True
967 self.fixed_z = fixed_z
968 self.fixed_xy = fixed_xy
970 self.position = np.array(position, dtype=np.float64)
972 @property
973 def position(self) -> np.ndarray:
974 return self.__position
976 @position.setter
977 def position(self, value: np.ndarray):
978 if self.fixed_z or self.fixed_xy:
979 raise FixedPositionError()
980 if not isinstance(value, np.ndarray):
981 value = np.array(value, dtype=np.float64)
982 assert 2 <= len(value) <= 3
983 if len(value) == 2:
984 value = np.array([*value, 0], dtype=np.float64)
985 if np.isnan(value).any():
986 raise ValueError
987 self.__position = value.copy()
989 @property
990 def x(self):
991 return self.__position[0]
993 @x.setter
994 def x(self, value):
995 if self.fixed_xy:
996 raise FixedXYError()
997 assert is_numeric(value)
998 self.__position = np.array([value, self.y, self.z])
1000 @property
1001 def y(self):
1002 return self.__position[1]
1004 @y.setter
1005 def y(self, value):
1006 if self.fixed_xy:
1007 raise FixedXYError()
1008 assert is_numeric(value)
1009 self.__position = np.array([self.x, value, self.z])
1011 @property
1012 def z(self):
1013 return self.__position[2]
1015 @z.setter
1016 def z(self, value):
1017 if self.fixed_z:
1018 raise FixedZError()
1019 assert is_numeric(value)
1020 self.__position = np.array([self.x, self.y, value])
1023class Cell(Geometric):
1024 def __init__(
1025 self,
1026 position: np.ndarray | tuple,
1027 delta: np.ndarray | tuple = None,
1028 ):
1029 """
1030 Extends the `Geometric` class to a cell with length, height and depth.
1032 Parameters
1033 ----------
1034 position : np.ndarray
1035 The position of the node in 2D/3D space.
1036 If 2D, a zero is added for the z coordinate.
1037 delta : np.ndarray | tuple, optional
1038 Delta vector [delta_x, delta_y, delta_z].
1039 """
1040 # visualize the Cell using vpython
1041 self.__vbox: Optional[box] = (
1042 None # must be initialized before Geometric init is called because of position setter
1043 )
1044 self.opacity = 1
1046 super().__init__(position=position)
1048 self.delta = delta
1050 @Geometric.position.setter
1051 def position(self, value):
1052 Geometric.position.fset(self, value)
1053 if self.__vbox is not None:
1054 self.update_vbox_geometry()
1056 @property
1057 def vbox(self):
1058 if self.__vbox is None:
1059 self.vbox = box(
1060 pos=vector(*self.position),
1061 size=vector(*self.delta),
1062 color=vector(0.6, 0.6, 0.6),
1063 opacity=self.opacity,
1064 shininess=0.0,
1065 )
1066 return self.__vbox
1068 @vbox.setter
1069 def vbox(self, value):
1070 assert isinstance(value, box)
1071 self.__vbox = value
1073 @property
1074 def delta(self):
1075 """
1076 Returns the delta vector.
1078 Returns
1079 -------
1080 np.ndarray :
1081 The delta vector.
1082 """
1083 return self.__delta
1085 @delta.setter
1086 def delta(self, value):
1087 value = np.asarray(value).ravel()
1088 if value.size == 1:
1089 self.__delta = np.append(value, [1.0, 1.0])
1090 elif value.size == 2:
1091 self.__delta = np.append(value, 1.0)
1092 elif value.size == 3:
1093 self.__delta = value
1094 else:
1095 raise ValueError(f"Expected 2 or 3 elements, got {value.size}.")
1097 if self.__vbox is not None:
1098 self.update_vbox_geometry()
1100 @property
1101 def delta_x(self):
1102 return self.delta[0]
1104 @property
1105 def delta_y(self):
1106 return self.delta[1]
1108 @property
1109 def delta_z(self):
1110 return self.delta[2]
1112 @property
1113 def length(self):
1114 return self.delta_x
1116 @property
1117 def height(self):
1118 return self.delta_y
1120 @property
1121 def depth(self):
1122 return self.delta_z
1124 @property
1125 def boundaries(self) -> list:
1126 """
1127 Returns the boundaries of the cell.
1129 The format looks like:
1130 [-x, x, -y, y, -z, z]
1132 Returns
1133 -------
1134 list :
1135 The boundaries.
1136 """
1137 negative = (self.position - self.delta / 2).tolist()
1138 positive = (self.position + self.delta / 2).tolist()
1139 return [item for pair in zip(negative, positive) for item in pair]
1141 def update_vbox_geometry(self) -> None:
1142 """
1143 Update position/size (geometry) of the vbox (visualization). Call only if geometry changes.
1144 """
1145 self.vbox.pos = vector(*self.position)
1146 self.vbox.size = vector(*self.delta)
1148 def update_color(
1149 self, temperature: float, t_min: float = 263.15, t_max: float = 413.15, colormap="managua"
1150 ) -> None:
1151 """
1152 Update the color of the vbox for visualization.
1154 Parameters
1155 ----------
1156 temperature : float
1157 The temperature in Kelvin to set.
1158 t_min : float | int, optional
1159 The minimal temperature for the color code.
1160 t_max : float | int, optional
1161 The maximal temperature for the color code.
1162 colormap : str, optional
1163 The colormap to use. See pyrc.core.visualization.color.color.py or the txt files in there respectively.
1164 """
1165 assert t_max > t_min
1166 t_norm = (temperature - t_min) / (t_max - t_min)
1167 r, g, b = value_to_rgb(t_norm, colormap)
1168 self.vbox.color = vector(r, g, b)
1170 def _apply_alignment(self, alignment, reference_position, other_deltas):
1171 """
1172 Apply alignment string to calculate new position.
1174 Parameters
1175 ----------
1176 alignment : str
1177 Face alignment specification with optional pairing override.
1178 Format: Space-separated or consecutive axis specifications.
1179 Each specification: [self_dir][other_dir]axis
1180 where self_dir and other_dir are '+' or '-'.
1181 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1182 Examples: 'x' (default opposite), 'xy' (both default opposite),
1183 '+-x' (explicit opposite), '++x' (same face),
1184 '-y' (self -y with other +y), '+-x -+y' (multiple axes with space)
1185 reference_position : np.ndarray
1186 Starting position to update
1187 other_deltas : np.ndarray
1188 Delta values of object being placed
1190 Returns
1191 -------
1192 np.ndarray
1193 Updated position after applying alignment
1195 Raises
1196 ------
1197 ValueError
1198 If alignment string is malformed
1199 """
1200 new_position = reference_position.copy()
1202 # Remove spaces and parse character by character
1203 alignment_no_space = alignment.replace(" ", "")
1205 i = 0
1206 while i < len(alignment_no_space):
1207 self_direction = 1
1208 other_direction = -1
1209 signs = []
1211 # Parse signs (0, 1, or 2)
1212 while i < len(alignment_no_space) and alignment_no_space[i] in ["-", "+"]:
1213 signs.append(-1 if alignment_no_space[i] == "-" else 1)
1214 i += 1
1215 if len(signs) > 2:
1216 raise ValueError(f"More than 2 signs found before axis at position {i}")
1218 # Parse axis
1219 if i < len(alignment_no_space) and alignment_no_space[i] in ["x", "y", "z"]:
1220 axis = alignment_no_space[i]
1221 i += 1
1222 else:
1223 if signs:
1224 raise ValueError(f"Found direction signs without following axis at position {i}")
1225 break
1227 # Apply signs
1228 if len(signs) == 1:
1229 self_direction = signs[0]
1230 other_direction = -signs[0] # Opposite of self
1231 elif len(signs) == 2:
1232 self_direction = signs[0]
1233 other_direction = signs[1]
1234 # else: len(signs) == 0, use defaults (1, -1)
1236 axis_index = {"x": 0, "y": 1, "z": 2}[axis]
1237 new_position[axis_index] = (
1238 self.position[axis_index]
1239 + self_direction * self.delta[axis_index] / 2
1240 - other_direction * other_deltas[axis_index] / 2
1241 )
1243 return new_position
1245 def place_adjacent(self, other_cell, alignment):
1246 """
1247 Place other_cell adjacent to self aligned at specified face(s).
1249 Parameters
1250 ----------
1251 other_cell : Cell
1252 Cell to be placed adjacent to self
1253 alignment : str
1254 Face alignment specification with optional pairing override.
1255 Format: Space-separated or consecutive axis specifications.
1256 Each specification: [self_dir][other_dir]axis
1257 where self_dir and other_dir are '+' or '-'.
1258 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1259 Examples: 'x' (default opposite), 'xy' (both default opposite),
1260 '+-x' (explicit opposite), '++x' (same face),
1261 '-y' (self -y with other +y), '+-x -+y' (multiple axes with space)
1262 """
1263 other_cell.position = self._apply_alignment(alignment, other_cell.position, other_cell.delta)
1265 return other_cell
1267 def create_adjacent(self, alignment, **kwargs):
1268 """
1269 Create and place new cell of same type adjacent to self.
1271 Parameters
1272 ----------
1273 alignment : str
1274 Face alignment specification with optional pairing override.
1275 Format: Space-separated or consecutive axis specifications.
1276 Each specification: [self_dir][other_dir]axis
1277 where self_dir and other_dir are '+' or '-'.
1278 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1279 Examples:\n
1280 ``x`` (default opposite), ``xy`` (both default opposite),\n
1281 ``+-x`` (explicit opposite), ``++x`` (same face),\n
1282 ``-y`` (self -y with other +y), ``+-x -+y`` (multiple axes with space)
1283 **kwargs
1284 Arguments passed to constructor (``delta`` etc.)
1286 Returns
1287 -------
1288 Cell or subclass :
1289 New `Cell` of same type as ``self`` placed adjacent to ``self``
1290 """
1291 if "position" not in kwargs:
1292 kwargs["position"] = self.position.copy()
1293 new_cell = type(self)(**kwargs)
1294 return self.place_adjacent(new_cell, alignment)
1296 @classmethod
1297 def create_grid(cls, grid_size, delta: np.ndarray | tuple = None, center_position=None, **kwargs) -> np.ndarray:
1298 """
1299 Create a 3D grid of cells.
1301 Parameters
1302 ----------
1303 grid_size : tuple of int
1304 Number of cells (nx, ny, nz).
1305 delta : np.ndarray | tuple
1306 Total dimensions in one vector.
1307 If single delta-values are given, too, the delta vector is used.
1308 delta : float
1309 Total length in x,y,z direction.
1310 center_position : np.ndarray, optional
1311 Center position of the grid. Defaults to origin.
1312 **kwargs
1313 Additional arguments passed to constructor.
1315 Returns
1316 -------
1317 np.ndarray
1318 3D array of shape (nx, ny, nz) containing Cell instances.
1319 """
1321 nx, ny, nz = grid_size
1322 cell_deltas = delta / np.array([nx, ny, nz])
1323 center = np.zeros(3) if center_position is None else center_position.copy()
1325 cells = np.empty((nx, ny, nz), dtype=object)
1326 for ix in range(nx):
1327 for iy in range(ny):
1328 for iz in range(nz):
1329 offset = (np.array([ix, iy, iz]) - (np.array([nx, ny, nz]) - 1) / 2) * cell_deltas
1330 cells[ix, iy, iz] = cls(position=center + offset, delta=cell_deltas, **kwargs)
1331 return cells
1333 def create_grid_aligned(self, alignment, grid_size, total_delta, position=None, **kwargs) -> np.ndarray[Cell]:
1334 """
1335 Create a 3D grid of cells aligned to self.
1337 Parameters
1338 ----------
1339 alignment : str
1340 Face alignment specification. See _apply_alignment for format.
1341 grid_size : tuple of int
1342 Number of cells (nx, ny, nz).
1343 total_delta : float
1344 Total length in x,y,z direction.
1345 position : np.ndarray, optional
1346 Base position, updated by alignment. Defaults to origin.
1347 **kwargs
1348 Additional arguments passed to constructor.
1350 Returns
1351 -------
1352 np.ndarray
1353 3D array of shape (nx, ny, nz) containing Cell instances.
1354 """
1355 base = np.zeros(3) if position is None else position.copy()
1356 total_deltas = np.array(total_delta)
1357 center = self._apply_alignment(alignment, base, total_deltas)
1358 return type(self).create_grid(grid_size, total_delta, center_position=center, **kwargs)
1361class Material:
1362 def __init__(
1363 self,
1364 name,
1365 density: float | int | np.number = np.nan,
1366 heat_capacity: float | int | np.number = np.nan,
1367 thermal_conductivity: float | int | np.number = np.nan,
1368 ):
1369 """
1370 Container to hold all material properties.
1372 Parameters
1373 ----------
1374 name : str
1375 The name/identifier of the material.
1376 density : float | int | np.number
1377 The density of the material in kg/m^3.
1378 heat_capacity : float | int | np.number
1379 The heat capacity of the material in J/kg/K.
1380 thermal_conductivity : float | int | np.number
1381 The thermal conductivity of the material in W/m/K.
1382 """
1383 self.__name = name
1384 self.__density = density
1385 self.__heat_capacity = heat_capacity
1386 self.__thermal_conductivity = thermal_conductivity
1388 def __new__(cls, *args, **kwargs):
1389 """
1390 This blocks the creation of instances from this class because it should only be used the children of self.
1392 Parameters
1393 ----------
1394 args
1395 kwargs
1396 """
1397 if cls is Material:
1398 children = [sub_cls.__name__ for sub_cls in Material.__subclasses__()]
1399 raise TypeError(f"Cannot instantiate {cls.__name__} directly. Use the children: {', '.join(children)}")
1400 return super().__new__(cls)
1402 @property
1403 def name(self):
1404 return self.__name
1406 @property
1407 def density(self):
1408 return self.__density
1410 @property
1411 def heat_capacity(self):
1412 return self.__heat_capacity
1414 @property
1415 def thermal_conductivity(self):
1416 return self.__thermal_conductivity
1419class Fluid(Material):
1420 def __init__(
1421 self,
1422 *args,
1423 kin_viscosity: float | int | np.number = np.nan,
1424 prandtl_number: float | int | np.number = np.nan,
1425 grashof_number: float | int | np.number = np.nan,
1426 **kwargs,
1427 ):
1428 """
1430 Parameters
1431 ----------
1432 args
1433 kin_viscosity : float | int | np.number
1434 The kinematic viscosity of the material in m^2/s.
1435 prandtl_number : float | int | np.number
1436 The Prandtl number of the material without unit.
1437 grashof_number : float | int | np.number
1438 The Grashof number of the material without unit.
1439 kwargs
1440 """
1441 super().__init__(*args, **kwargs)
1442 self.__kin_viscosity = kin_viscosity
1443 self.__prandtl_number = prandtl_number
1444 self.__grashof_number = grashof_number
1446 @property
1447 def kin_viscosity(self):
1448 return self.__kin_viscosity
1450 @property
1451 def prandtl_number(self):
1452 if self.__prandtl_number is None or not is_set(self.__prandtl_number):
1453 self.__prandtl_number = self.kin_viscosity * self.density * self.heat_capacity / self.thermal_conductivity
1454 return self.__prandtl_number
1456 @property
1457 def Pr(self):
1458 return self.prandtl_number
1460 @property
1461 def grashof_number(self):
1462 return self.__grashof_number
1464 @property
1465 def Gr(self):
1466 return self.__grashof_number
1468 @property
1469 def rayleigh_number(self):
1470 return self.grashof_number * self.prandtl_number
1473class Solid(Material):
1474 def __init__(self, *args, **kwargs):
1475 super().__init__(*args, **kwargs)
1478class CompositeMaterialSolid(Solid):
1479 """
1480 Combine multiple materials using ratios.
1481 """
1483 def __init__(self, name, materials, ratios, **kwargs):
1484 total_ratio = sum(ratios)
1485 weights = [ratio / total_ratio for ratio in ratios]
1487 density = sum(mat.density * weight for mat, weight in zip(materials, weights))
1488 heat_capacity = sum(mat.heat_capacity * weight for mat, weight in zip(materials, weights))
1489 thermal_conductivity = sum(mat.thermal_conductivity * weight for mat, weight in zip(materials, weights))
1491 solid_kwargs = {
1492 "name": name,
1493 "density": density,
1494 "heat_capacity": heat_capacity,
1495 "thermal_conductivity": thermal_conductivity,
1496 }
1497 solid_kwargs.update(kwargs)
1498 super().__init__(
1499 **solid_kwargs
1500 )
1503class CompositeMaterialFluid(Fluid):
1504 """
1505 Combine multiple fluid materials using ratios.
1506 """
1508 def __init__(self, name, materials, ratios, **kwargs):
1509 total_ratio = sum(ratios)
1510 weights = [ratio / total_ratio for ratio in ratios]
1512 density = sum(mat.density * weight for mat, weight in zip(materials, weights))
1513 heat_capacity = sum(mat.heat_capacity * weight for mat, weight in zip(materials, weights))
1514 thermal_conductivity = sum(mat.thermal_conductivity * weight for mat, weight in zip(materials, weights))
1515 kin_viscosity = sum(mat.kin_viscosity * weight for mat, weight in zip(materials, weights))
1517 fluid_kwargs = {
1518 "name": name,
1519 "density": density,
1520 "heat_capacity": heat_capacity,
1521 "thermal_conductivity": thermal_conductivity,
1522 "kin_viscosity": kin_viscosity
1523 }
1524 fluid_kwargs.update(kwargs)
1525 super().__init__(
1526 **fluid_kwargs
1527 )
1530def calculate_balance_for_resistors(node, resistors: list[MassTransport]):
1531 balance = 0
1532 for resistor in resistors:
1533 if resistor.sink == node:
1534 # resistor is source for node
1535 balance += resistor.volume_flow
1536 else:
1537 # resistor is sink for node
1538 balance -= resistor.volume_flow
1539 return balance
1542class EquationItemInput(Input, EquationItem, ABC): pass