Coverage for pyrc \ core \ components \ templates.py: 64%
675 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8from __future__ import annotations
10import os
11import pickle
12import re
13from abc import abstractmethod, ABC
14from typing import Any, TYPE_CHECKING, Optional
16import numpy as np
17import pandas as pd
18import sympy as sp
20from vpython import box, vector
22from pyrc.core.components.input import Input
23from pyrc.core.visualization.color.color import value_to_rgb
24from pyrc.tools.errors import FixedPositionError, FixedXYError, FixedZError
25from pyrc.tools.functions import is_set
26from pyrc.tools.science import is_numeric
27from pyrc.visualization.vtk_parser import write_static_cells_vtu, write_temperature_vtu
29if TYPE_CHECKING:
30 from pyrc.core.inputs import InternalHeatSource
31 from pyrc.core.resistors import MassTransport
32 from pyrc.core.components.resistor import Resistor
33 from pyrc.core.components.node import TemperatureNode
36class RCObjects:
37 def __init__(self, nodes: list = None, resistors: list = None, boundaries: list = None):
38 self.__nodes = nodes
39 self.__resistors: list[Resistor] = resistors
40 self.__boundaries = boundaries
41 self.__input_objs = None
42 self.__internal_heat_sources = None
44 @property
45 def nodes(self) -> list:
46 return self.__nodes
48 @property
49 def mass_flow_nodes(self):
50 from pyrc.core.nodes import MassFlowNode
51 return [n for n in self.nodes if isinstance(n, MassFlowNode)]
53 @property
54 def resistors(self) -> list[Resistor]:
55 return self.__resistors
57 @property
58 def boundaries(self) -> list:
59 return self.__boundaries
61 @property
62 def inputs(self) -> list[EquationItemInput]:
63 if self.__input_objs is None and self.nodes is not None:
64 result = []
65 if self.boundaries is not None:
66 result.extend(self.boundaries)
67 if self.internal_heat_sources is not None:
68 result.extend(self.internal_heat_sources)
69 self.__input_objs = result
70 return self.__input_objs
72 @property
73 def all(self) -> list[TemperatureNode | Resistor]:
74 """
75 Returns all `TemperatureNode`\s and `Resistor`\s (network objects, that are linked together).
77 `InternalHeatSource` s are not returned!
79 Returns
80 -------
81 list[TemperatureNode | Resistor] :
82 Capacitors, BoundaryConditions and Resistors
83 """
84 result = []
85 item: list | None
86 for item in [self.__nodes, self.__resistors, self.__boundaries]:
87 if item is not None and item != []:
88 result.extend(item)
89 return result
91 @property
92 def all_equation_objects(self) -> list[EquationItem]:
93 return [*self.all, *self.other_equation_objects]
95 @property
96 def other_equation_objects(self) -> list[EquationItem]:
97 """
98 Like `RCObjects.all` but not the connected RC objects were returned but all other `EquationItem` s.
100 This is mainly used for `InternalHeatSource` s and in the future for other input items.
102 Returns
103 -------
104 list[InternalHeatSource] :
105 All other `EquationItem` s.
106 Until now, it's just all InternalHeatSource items.
107 """
108 return self.internal_heat_sources
110 @property
111 def internal_heat_sources(self) -> list[InternalHeatSource]:
112 if self.__internal_heat_sources is None:
113 self.__internal_heat_sources = [n.internal_heat_source for n in self.nodes if n.internal_heat_source]
114 return self.__internal_heat_sources
116 def set_lists(self,
117 capacitors: list = None,
118 resistors: list = None,
119 boundaries: list = None):
120 if capacitors is not None:
121 assert isinstance(capacitors, list)
122 self.__nodes = capacitors
123 if resistors is not None:
124 assert isinstance(resistors, list)
125 self.__resistors = resistors
126 if boundaries is not None:
127 assert isinstance(boundaries, list)
128 self.__boundaries = boundaries
130 @property
131 def raw_data(self) -> tuple:
132 return (
133 self.__nodes,
134 self.__resistors,
135 self.__boundaries,
136 self.__input_objs
137 )
139 def wipe_all(self):
140 """
141 Deletes every raw data.
142 """
143 self.__nodes = None
144 self.__resistors = None
145 self.__boundaries = None
146 self.__input_objs = None
147 self.__internal_heat_sources = None
149 def get_all_objects(self, variant: type) -> list:
150 """
151 Returns a list with tuples with all objects in the network with requested variant/type.
153 Parameters
154 ----------
155 variant : type
156 The ``type`` of the objects that will be returned.
157 Will be used for the ``isinstance()`` match.
159 Returns
160 -------
161 list :
162 The list with all objects in the network with requested variant.
163 """
164 result = []
166 for element in self.all:
167 if isinstance(element, variant):
168 result.append(element)
169 return result
171 def set_loaded_data(self, loaded_objects: RCObjects):
172 """
173 Replaces all attributes with the ones of the loaded `RCObjects`.
175 Parameters
176 ----------
177 loaded_objects : RCObjects
178 The loaded `RCObjects` that should "replace" self / should be used to overwrite self.
180 """
181 raw_data = loaded_objects.raw_data
182 self.__nodes = raw_data[0]
183 self.__resistors = raw_data[1]
184 self.__boundaries = raw_data[2]
185 self.__input_objs = raw_data[3]
188initial_rc_objects = RCObjects()
191class SymbolMixin:
193 @property
194 @abstractmethod
195 def symbols(self) -> list:
196 """
197 Returns a list of all sympy.symbols of the object, except time dependent symbols.
199 Must be in the same order as self.values.
201 Returns
202 -------
203 list :
204 The list of sympy.symbols.
205 """
206 pass
208 @property
209 @abstractmethod
210 def values(self) -> list:
211 """
212 Returns a list of all values of all object symbols, except of time dependent symbols.
214 Must be in the same order as self.symbols.
216 Returns
217 -------
218 list :
219 The list of sympy.symbols.
220 """
221 pass
223 @property
224 def time_dependent_symbols(self) -> list:
225 """
226 Returns a list of all symbols that are needed to calculate self.value.
228 The list is sorted by the name of the symbols for deterministic reasons.
230 Returns
231 -------
232 list :
233 All symbols that are needed to calculate self.value (symbols that are time dependent and will be calculated
234 between time steps).
235 """
236 symbols = {sym for value in self.values if isinstance(value, sp.Expr) for sym in value.free_symbols}
237 return sorted(symbols, key=lambda s: s.name)
240class EquationItem(SymbolMixin, ABC):
241 item_counter = 0
243 def __init__(self):
244 EquationItem.item_counter += 1
245 self.id: int = EquationItem.item_counter
246 self._index = None
248 @classmethod
249 def reset_counter(cls):
250 EquationItem.item_counter = 0
253class RCSolution:
254 def __init__(self, rc_objects: RCObjects = initial_rc_objects):
255 self.rc_objects = rc_objects
256 self.__result_vectors: np.ndarray | Any = None
257 self._input_vectors: list = []
258 self.time_steps: np.ndarray | Any = None
260 self._temperature_dataframe = None
261 self._dataframe = None
263 self.last_saved_timestep_index = 0
265 @property
266 def input_exists(self) -> bool:
267 if self._input_vectors:
268 return True
269 return False
271 @property
272 def inputs(self):
273 return self.rc_objects.inputs
275 @property
276 def nodes(self):
277 return self.rc_objects.nodes
279 @property
280 def input_vectors(self) -> np.ndarray | Any:
281 if self._input_vectors:
282 return np.concatenate(self._input_vectors, axis=0)
283 return None
285 def last_value_input(self, index):
286 return self._input_vectors[-1][-1, index]
288 def append_to_input(self, new_input_vector: np.ndarray):
289 self._input_vectors.append(new_input_vector.reshape(1, -1))
291 def delete_last_input(self):
292 if len(self._input_vectors) > 0:
293 self._input_vectors.pop()
295 def delete_solution_except_first(self):
296 """
297 Deletes every solution except for the time_step == 0.
298 """
299 self.__result_vectors = self.__result_vectors[0, :]
300 first_input_vector = self._input_vectors[0]
301 self._input_vectors = [first_input_vector]
302 self.time_steps = np.array(self.time_steps[0])
304 self._temperature_dataframe = None
305 self._dataframe = None
307 self.last_saved_timestep_index = 0
309 def save_solution(self, path_with_name_and_ending: str):
310 with open(path_with_name_and_ending, "wb") as f:
311 pickle.dump(self.raw_data, f)
312 if self.time_steps is not None:
313 self.last_saved_timestep_index = len(self.time_steps) - 1
315 def save_to_file_only(self, t: np.ndarray, y: np.ndarray, path_with_name_and_ending: str):
316 """
317 Only save the passed solution to file and delete the input vector except the last value.
319 The t and y values are not saved into the solution object to prevent high RAM usage. The input vector is
320 deleted for the same reason. Only the last value of the input vector is kept to use it for further solving.
322 Parameters
323 ----------
324 t : np.ndarray
325 The time step array of the solution.
326 y : np.ndarray
327 The result array/matrix of the solution.
328 path_with_name_and_ending : str
329 Where to save the solution.
330 """
331 if t is not None:
332 data = [
333 y,
334 self._input_vectors[-len(t):],
335 t,
336 None,
337 None
338 ]
339 with open(path_with_name_and_ending, "wb") as f:
340 pickle.dump(data, f)
341 # delete the input vector to make space for new one
342 self._input_vectors = []
344 def save_new_solution(self, path_with_name_and_ending: str):
345 """
346 Like save_solution, but it only saves the new solution data instead of everything.
348 New data is defined as all data that is saved in a time step greater than self.last_saved_timestep.
350 Parameters
351 ----------
352 path_with_name_and_ending : str
353 Where to save the solution.
355 Returns
356 -------
358 """
359 if self.time_steps is None:
360 self.save_solution(path_with_name_and_ending)
361 else:
362 # save everything from self.last_saved_timestep_index up to the last time step
363 with open(path_with_name_and_ending, "wb") as f:
364 pickle.dump(self.raw_data_last(self.last_saved_timestep_index + 1), f)
366 if self.time_steps is not None:
367 self.last_saved_timestep_index = len(self.time_steps) - 1
369 def write_paraview_data(self,
370 folder: str,
371 increment: int = None,
372 number_of_saved_steps: int = None,
373 time_increment: int | float | Any = None,
374 use_degrees_celsius: bool = True):
375 """
376 Parsing the result data to vtu files that can be used to visualize the result in paraview.
378 It is recommended to not generate more than several thousand resolution steps.
379 If no increment is given, about 2000 result steps are created.
381 Parameters
382 ----------
383 folder : str
384 The folder to save the paraview data in.
385 increment : int, optional
386 If specified, only the incremental part of the result is parsed (to shorten the write time).
387 If None, the increment is calculated so that a maximum of 2000 results are created.
388 number_of_saved_steps : int, optional
389 Works like increment but instead of walking a fixed step width it calculates the increment using the
390 given number to get the number of steps.
391 Or: Say, how many steps should be saved (+-1).
392 Overwrites the increment parameter.
393 time_increment : int | float | Any, optional
394 The x'th time that should be saved in seconds.
395 If given, the increment parameter is not used.
396 Example usage: time_increment = 120
397 The result files are created for results every 120 seconds.
398 use_degrees_celsius : bool, optional
399 If True, the temperatures are saved as degree Celsius. In Kelvin otherwise.
401 Returns
402 -------
404 """
405 static_points, static_cells = write_static_cells_vtu(self.nodes, folder)
407 if increment is None:
408 increment = self.time_steps_count // 2000
409 if number_of_saved_steps is not None:
410 increment = self.time_steps_count // number_of_saved_steps
411 increment = max(1, int(increment))
412 if time_increment is not None:
413 assert isinstance(time_increment, float) or isinstance(time_increment, int)
414 mask = None
415 if time_increment is not None:
416 mask = np.diff(np.floor((self.time_steps - self.time_steps[0]) / time_increment), prepend=-1).astype(
417 bool)
418 if mask is not None:
419 print("Time increment is used.")
420 result = self.result_vectors[mask, :]
421 time_steps = self.time_steps[mask]
422 else:
423 print("Manual increment is used.")
424 result = self.result_vectors[::increment, :]
425 time_steps = self.time_steps[::increment]
427 if use_degrees_celsius:
428 result -= 273.15
430 write_temperature_vtu(
431 result,
432 time_steps,
433 static_points,
434 static_cells,
435 folder,
436 step_interval=1 # reducing of the result was already done, so parse every step
437 )
439 @property
440 def raw_data(self):
441 return [
442 self.result_vectors,
443 self._input_vectors,
444 self.time_steps,
445 self._temperature_dataframe,
446 self._dataframe
447 ]
449 def raw_data_last(self, starting_index):
450 """
451 Like raw_data but only returns the values starting from `starting_index`.
453 Parameters
454 ----------
455 starting_index : int
456 The index where to start getting the data from.
458 Returns
459 -------
460 list :
461 A list with all raw data starting from `starting_index`.
462 """
463 result_vectors = None
464 if self.result_vectors is not None:
465 result_vectors = self.result_vectors[starting_index:, :]
466 input_vectors = []
467 if self._input_vectors:
468 input_vectors = self._input_vectors[starting_index:]
469 time_steps = None
470 if self.time_steps is not None:
471 time_steps = self.time_steps[starting_index:]
472 temperature_dataframe = None
473 if self._temperature_dataframe is not None:
474 temperature_dataframe = self._temperature_dataframe.iloc[starting_index:]
475 dataframe = None
476 if self._dataframe is not None:
477 dataframe = self._dataframe.iloc[starting_index:]
479 return [
480 result_vectors,
481 input_vectors,
482 time_steps,
483 temperature_dataframe,
484 dataframe
485 ]
487 def delete_solutions(self, confirm=False):
488 """
489 Use with care! Deletes all data if it's confirm=True.
490 """
491 if confirm:
492 self.__result_vectors = None
493 self._input_vectors = []
494 self.time_steps = None
496 self._temperature_dataframe = None
497 self._dataframe = None
499 self.last_saved_timestep_index = 0
500 else:
501 print("You have to confirm to delete all data.")
503 def load_solution(self,
504 path_with_name_and_ending: str,
505 save_combined_solution: bool = True,
506 last_time_step=None):
507 """
508 Loads a pickled solution. If the file is not found it searches for incremental solutions and loads them.
510 This method forces the load. That the hash matches the current network is the responsibility of the user.
512 The search for an incremental solution is done by using the hash as the start followed by a "_" and
513 everything after the hash is used as add-on to the name, except "_result". So if the requested file is:
514 fcd7d8e0f79c611c05db6e80457b8c3f0f2a696acb5e213cc0516bed468e9497_normal_static_result.pickle
515 it searches for:
516 fcd7d8e0f79c611c05db6e80457b8c3f0f2a696acb5e213cc0516bed468e9497_normal_static_0000100_*.pickle
517 The number is the time step and after the time step everything can follow (".*").
519 Parameters
520 ----------
521 path_with_name_and_ending : str
522 The path where the solution is stored with name and ending.
523 save_combined_solution : bool, optional
524 If True, the solution is saved in a pickle file if it was concatenated from incremental solutions.
525 last_time_step : int | float, optional
526 The last time step which defines the complete solution.
527 If given, it is checked if the whole solution was loaded or just a part out of it.
528 Used, to continue canceled simulations.
530 Returns
531 -------
533 """
534 path_with_name_and_ending = os.path.normpath(path_with_name_and_ending)
535 if os.path.exists(path_with_name_and_ending):
536 with open(path_with_name_and_ending, "rb") as f:
537 loaded_solution = pickle.load(f)
538 raw_data = loaded_solution
539 self._append_or_initialize(raw_data)
540 return True
541 else:
542 # try to load incremental data
543 folder, rc_hash, name_add_on = self._get_hash_and_folder_from_file_name(path_with_name_and_ending)
544 if rc_hash is not None:
545 success = self._load_all_solutions(folder, rc_hash, name_add_on)
546 solution_is_complete = True
547 if last_time_step is not None and success:
548 last_loaded_step = self.time_steps[-1]
549 if last_loaded_step < last_time_step:
550 print(f"Solution loaded {last_loaded_step}/{last_time_step}")
551 solution_is_complete = False
552 success = last_loaded_step
553 if success and save_combined_solution and solution_is_complete:
554 self.save_solution(path_with_name_and_ending)
555 return success
556 return False
558 def _append_or_initialize(self, raw_data):
559 """
560 Append new data from raw_data to existing attributes if they are not None,
561 otherwise initialize them.
563 Parameters
564 ----------
565 raw_data : tuple
566 A tuple containing (result_vectors, input_vectors, time_steps,
567 temperature_dataframe, dataframe).
568 """
569 if raw_data[0] is not None:
570 if self.__result_vectors is None:
571 self.__result_vectors: np.ndarray = raw_data[0]
572 else:
573 self.__result_vectors = np.concatenate((self.__result_vectors, raw_data[0]), axis=0)
575 if raw_data[1] is not None:
576 if self._input_vectors is None:
577 self._input_vectors: list = raw_data[1]
578 else:
579 self._input_vectors.extend(raw_data[1])
581 if raw_data[2] is not None:
582 if self.time_steps is None:
583 self.time_steps: np.ndarray = raw_data[2]
584 else:
585 self.time_steps = np.concatenate((self.time_steps, raw_data[2]), axis=0)
587 if raw_data[3] is not None:
588 if self._temperature_dataframe is None:
589 self._temperature_dataframe = raw_data[3]
590 else:
591 self._temperature_dataframe = pd.concat([self._temperature_dataframe, raw_data[3]])
593 if raw_data[4] is not None:
594 if self._dataframe is None:
595 self._dataframe = raw_data[4]
596 else:
597 self._dataframe = pd.concat([self._dataframe, raw_data[4]])
599 def _load_all_solutions(self, save_dir: str, save_prefix: str, hash_add_on: str | Any = None):
600 """
601 Load all incrementally saved pickled solution files matching the pattern
602 '{save_prefix}.*{float(batch_end):09.0f}.*.(pickle|pkl)' in ascending order of batch_end.
604 The current solution is replaced if some exist!
606 Parameters
607 ----------
608 save_dir : str
609 Directory where the pickled files are stored.
610 save_prefix : str
611 Prefix of the saved files to identify relevant pickles.
612 hash_add_on : str | Any, optional
613 A string that is added to the hash with a "_" to serve as identifier.
615 Returns
616 -------
617 list
618 A list of loaded solutions sorted by batch_end.
619 """
620 if hash_add_on is not None:
621 save_prefix += "_" + hash_add_on
622 # pattern = re.compile(rf"{re.escape(save_prefix)}.*?(\d+(?:\.\d+)?)(?!.*\d).*\.(?:pickle|pkl)$")
623 pattern = re.compile(
624 rf"{re.escape(save_prefix)}_?(\d+(?:\.\d+)?)(?=_(?:s)?\.pickle$|(?:s)?\.pickle$|_(?:s)?\.pkl$|(?:s)?\.pkl$)")
625 solutions = []
627 for file_name in os.listdir(save_dir):
628 match = pattern.match(file_name)
629 if match:
630 batch_end = float(match.group(1))
631 solutions.append((batch_end, file_name))
633 solutions.sort(key=lambda x: x[0])
635 all_data = [[] for _ in range(5)]
637 for _, file_name in solutions:
638 with open(os.path.join(save_dir, file_name), "rb") as f:
639 raw_data = pickle.load(f)
640 for i, data in enumerate(raw_data):
641 if data is not None:
642 all_data[i].append(data)
644 # Concatenate once for each data type
645 if all_data[0]:
646 self.__result_vectors = np.concatenate(all_data[0], axis=0)
648 if all_data[1]:
649 self._input_vectors = []
650 for vectors in all_data[1]:
651 self._input_vectors.extend(vectors)
653 if all_data[2]:
654 self.time_steps = np.concatenate(all_data[2], axis=0)
656 if all_data[3]:
657 self._temperature_dataframe = pd.concat(all_data[3], ignore_index=True)
659 if all_data[4]:
660 self._dataframe = pd.concat(all_data[4], ignore_index=True)
661 #
662 # for _, file_name in solutions:
663 # with open(os.path.join(save_dir, file_name), "rb") as f:
664 # raw_data = pickle.load(f)
665 # self._append_or_initialize(raw_data)
667 if len(solutions) > 0:
668 return True
669 return False
671 @staticmethod
672 def _get_hash_and_folder_from_file_name(path: str):
673 """
674 Extract the hash and folder from the full path of a saved pickle file.
676 The hash is defined as all characters in the filename before the first underscore.
678 The name add-on is everything followed by the hash (without the underscore) but without the .pickle extension
679 and without "_result". If no characters match this, None is returned.
681 Parameters
682 ----------
683 path : str
684 Full path to the saved pickle file including the file name (with or without ending)
686 Returns
687 -------
688 tuple[str, str]
689 A tuple (save_dir, save_prefix).
690 """
691 folder = os.path.dirname(path)
692 filename = os.path.basename(path)
693 # split at _
694 split = filename.split("_", 1)
695 rc_hash = None
696 name_add_on = None
697 if len(split) > 1:
698 rc_hash = split[0]
699 name_add_on = split[1]
700 else:
701 # split at .
702 split = filename.split(".", 1)
703 if len(split) > 1:
704 if len(split[0]) == 64:
705 rc_hash = split[0]
706 name_add_on = ".".join(split[1:]) if len(split) > 1 else None
707 if name_add_on is not None:
708 name_add_on = name_add_on.removesuffix(".pickle")
709 name_add_on = name_add_on.removesuffix(".pkl")
710 name_add_on = name_add_on.removesuffix("_result")
711 return folder, rc_hash, name_add_on
713 @property
714 def exist(self) -> bool:
715 if self.result_vectors is not None:
716 return True
717 return False
719 @property
720 def result_vectors(self) -> np.ndarray | Any:
721 return self.__result_vectors
723 @result_vectors.setter
724 def result_vectors(self, value):
725 pass
727 @property
728 def time_steps_count(self):
729 return len(self.time_steps.flatten())
731 @property
732 def temperature_vectors_pandas(self) -> pd.DataFrame:
733 """
734 Returns the solution with all node results in one column within a pd.DataFrame.
736 The DataFrame is cached. To reset it, set ``self.result_vectors = None`` .
738 Returns
739 -------
740 pd.DataFrame :
741 The solution. Each column represents the solution of one node. Each row the time step.
742 The index of the DataFrame are the time steps.
743 """
744 if self._temperature_dataframe is None:
745 self._temperature_dataframe = pd.DataFrame(self.result_vectors)
746 self._temperature_dataframe.index = self.time_steps
747 return self._temperature_dataframe
749 @property
750 def dataframe(self):
751 if self._dataframe is None:
752 merge = np.concatenate((self.result_vectors, self.input_vectors), axis=1)
753 self._dataframe = pd.DataFrame(merge, columns=[*[n.id for n in self.nodes], *[i.id for i in self.inputs]],
754 index=self.time_steps)
755 return self._dataframe
757 @property
758 def temperature_vectors(self) -> np.ndarray:
759 """
760 Like temperature_vectors_pandas, but returns a numpy array.
762 This value is not cached.
764 Returns
765 -------
766 np.ndarray :
767 The solution. Each column represents the solution of one node. Each row the time step.
768 """
769 return self.result_vectors
771 @property
772 def t(self):
773 return self.time_steps
775 @t.setter
776 def t(self, value):
777 self.time_steps = value
779 @property
780 def y(self):
781 return self.result_vectors
783 @y.setter
784 def y(self, value):
785 assert isinstance(value, np.ndarray)
786 self.__result_vectors = value
788 # def set_loaded_data(self, loaded_solutions: RCSolution):
789 # """
790 # Replaces all attributes with the ones of the loaded `RCSolution`.
791 #
792 # This is used when the object can hardly be replaced by a loaded one, e.g. when used in composition.
793 #
794 # Parameters
795 # ----------
796 # loaded_solutions : RCSolution
797 # The loaded solution object that should "replace" self / should be used to overwrite self.
798 #
799 # """
800 # self.time_steps = loaded_solutions.time_steps
801 # self.result_vectors = loaded_solutions.result_vectors
802 # self._input_vectors: np.ndarray | Any = loaded_solutions._input_vectors
804 def save_last_step(self, file_path):
805 """
806 Saves a vector with the solution of the last time step.
808 The saved data can be set as initial values using `RCNetwork.load_initial_values`
810 Parameters
811 ----------
812 file_path : str | Any
813 The file path with name and ending.
814 """
815 last_solution = self.temperature_vectors[-1, :]
816 last_input = self.input_vectors[-1, :]
817 with open(file_path, "wb") as f:
818 pickle.dump((last_solution, last_input), f)
821solution_object = RCSolution()
824class ObjectWithPorts:
826 def __init__(self):
827 self.__neighbours = []
829 @property
830 def neighbours(self):
831 return self.__neighbours
833 @property
834 def ports(self):
835 """
836 Alias for `self.neighbours`.
837 """
838 return self.__neighbours
840 def __iter__(self):
841 """
842 Iterate over `self.neighbours`.
844 Returns
845 -------
846 ObjectWithPorts :
847 The neighbours.
848 """
849 for neighbour in self.neighbours:
850 yield neighbour
852 def connect(self, neighbour, direction: tuple | list | np.ndarray | Any = None, node_direction_points_to=None):
853 """
854 Add the given object/neighbour to the `self.neighbours` list.
856 The neighbour itself will connect ``self`` to its neighbours list.
857 E.g.: If node2 should be connected to node1, node2's neighbours list appends self.
859 The direction is a possibility to set the direction between two connected nodes manually. It is used for
860 connected `BoundaryCondition` s and `Node` s.
861 The direction is set for the neighbour. The
863 Parameters
864 ----------
865 neighbour : ObjectWithPorts
866 The neighbour to connect to. It will connect ``self`` to itself.
867 This is the Node the manual direction is set on!
868 direction : tuple | list | np.ndarray, optional
869 If not None, a direction is set manually to node_direction_points_at.
870 Either none or both node_direction_points_at and direction must be passed.
871 node_direction_points_to : TemperatureNode, optional
872 If not None, this is the node to which the direction points at (looking from neighbour).
873 Either none or both node_direction_points_at and direction must be passed.
874 Must be a TemperatureNode.
875 """
876 self.__neighbours.append(neighbour)
877 neighbour.__single_connect(self)
878 if (direction is not None) ^ (node_direction_points_to is not None):
879 raise ValueError("Either none or both node_direction_points_at and direction must be passed.")
881 # set direction of neighbour using the node the direction points at
882 if direction is not None:
883 direction: tuple | np.ndarray | list
884 from pyrc.core.nodes import Node
885 from pyrc.core.components.node import TemperatureNode
886 assert isinstance(node_direction_points_to, TemperatureNode)
887 direction: np.ndarray = np.array(direction) / np.linalg.norm(np.array(direction))
888 if not isinstance(neighbour, Node):
889 assert isinstance(self, Node), "The direction can only set on Nodes, not TemperatureNodes/Resistors."
890 # set direction at self to node_direction_points_to
891 self.set_direction(node_direction_points_to, direction)
892 else:
893 node: TemperatureNode = node_direction_points_to
894 neighbour.set_direction(node, direction)
896 def double_connect(self, neighbour1, neighbour2):
897 self.connect(neighbour1)
898 self.connect(neighbour2)
900 def __single_connect(self, neighbour):
901 """
902 Like `self.connect`, but it doesn't set the connection to the neighbour, too.
904 Parameters
905 ----------
906 neighbour : ObjectWithPorts
907 The neighbour to connect to.
908 """
909 self.__neighbours.append(neighbour)
912class ConnectedFlowObject:
914 def __init__(self):
915 self._volume_flow = None
916 # manual switch to determine if the volume flows are balanced: sum(inflows)-sum(outflows) = 0
917 # This switch should be actuated from the algorithm that distributes the flows or a method that checks the
918 # balance.
919 self.volume_flow_is_balanced = False
921 @property
922 def guess_volume_flow(self):
923 return self._volume_flow
925 @property
926 def volume_flow(self):
927 return self._volume_flow
929 @abstractmethod
930 def check_balance(self) -> bool:
931 pass
933 @property
934 def sources(self) -> list:
935 return []
937 @property
938 def sinks(self) -> list:
939 return []
941 @property
942 @abstractmethod
943 def balance(self):
944 pass
947class Geometric:
948 """
949 Skeleton for a geometric object that only contains the position in 3D space.
951 Defines getter and setter for X, Y and Z coordinates. If a 2D vector is given,
952 the Z coordinate is set to 0.
954 Parameters
955 ----------
956 position : np.ndarray
957 Either 2D or 3D position of the object as array.
958 fixed_position : bool
959 If ``True``, the position cannot be changed. Overwrites both `fixed_z` and `fixed_xy` parameters.
960 fixed_z : bool
961 If ``True``, the z coordinate cannot be changed.
962 fixed_xy : bool
963 If ``True``, the x and y coordinates cannot be changed.
964 """
966 def __init__(self,
967 position: np.ndarray | tuple,
968 fixed_position: bool = False,
969 fixed_z: bool = False,
970 fixed_xy: bool = False):
972 if fixed_position:
973 fixed_z = fixed_xy = True
974 self.fixed_z = fixed_z
975 self.fixed_xy = fixed_xy
977 self.position = np.array(position, dtype=np.float64)
979 @property
980 def position(self) -> np.ndarray:
981 return self.__position
983 @position.setter
984 def position(self, value: np.ndarray):
985 if self.fixed_z or self.fixed_xy:
986 raise FixedPositionError()
987 if not isinstance(value, np.ndarray):
988 value = np.array(value, dtype=np.float64)
989 assert 2 <= len(value) <= 3
990 if len(value) == 2:
991 value = np.array([*value, 0], dtype=np.float64)
992 if np.isnan(value).any():
993 raise ValueError
994 self.__position = value.copy()
996 @property
997 def x(self):
998 return self.__position[0]
1000 @x.setter
1001 def x(self, value):
1002 if self.fixed_xy:
1003 raise FixedXYError()
1004 assert is_numeric(value)
1005 self.__position = np.array([value, self.y, self.z])
1007 @property
1008 def y(self):
1009 return self.__position[1]
1011 @y.setter
1012 def y(self, value):
1013 if self.fixed_xy:
1014 raise FixedXYError()
1015 assert is_numeric(value)
1016 self.__position = np.array([self.x, value, self.z])
1018 @property
1019 def z(self):
1020 return self.__position[2]
1022 @z.setter
1023 def z(self, value):
1024 if self.fixed_z:
1025 raise FixedZError()
1026 assert is_numeric(value)
1027 self.__position = np.array([self.x, self.y, value])
1030class Cell(Geometric):
1032 def __init__(self,
1033 position: np.ndarray | tuple,
1034 delta: np.ndarray | tuple = None,
1035 ):
1036 """
1037 Extends the `Geometric` class to a cell with length, height and depth.
1039 Parameters
1040 ----------
1041 position : np.ndarray
1042 The position of the node in 2D/3D space.
1043 If 2D, a zero is added for the z coordinate.
1044 delta : np.ndarray | tuple, optional
1045 Delta vector [delta_x, delta_y, delta_z].
1046 """
1047 # visualize the Cell using vpython
1048 self.__vbox: Optional[
1049 box] = None # must be initialized before Geometric init is called because of position setter
1050 self.opacity = 1
1052 super().__init__(position=position)
1054 self.delta = delta
1056 @Geometric.position.setter
1057 def position(self, value):
1058 Geometric.position.fset(self, value)
1059 if self.__vbox is not None:
1060 self.update_vbox_geometry()
1062 @property
1063 def vbox(self):
1064 if self.__vbox is None:
1065 self.vbox = box(
1066 pos=vector(*self.position),
1067 size=vector(*self.delta),
1068 color=vector(0.6, 0.6, 0.6),
1069 opacity=self.opacity,
1070 shininess=0.0,
1071 )
1072 return self.__vbox
1074 @vbox.setter
1075 def vbox(self, value):
1076 assert isinstance(value, box)
1077 self.__vbox = value
1079 @property
1080 def delta(self):
1081 """
1082 Returns the delta vector.
1084 Returns
1085 -------
1086 np.ndarray :
1087 The delta vector.
1088 """
1089 return self.__delta
1091 @delta.setter
1092 def delta(self, value):
1093 value = np.asarray(value).ravel()
1094 if value.size == 1:
1095 self.__delta = np.append(value, [1.0, 1.0])
1096 elif value.size == 2:
1097 self.__delta = np.append(value, 1.0)
1098 elif value.size == 3:
1099 self.__delta = value
1100 else:
1101 raise ValueError(f"Expected 2 or 3 elements, got {value.size}.")
1103 if self.__vbox is not None:
1104 self.update_vbox_geometry()
1106 @property
1107 def delta_x(self):
1108 return self.delta[0]
1110 @property
1111 def delta_y(self):
1112 return self.delta[1]
1114 @property
1115 def delta_z(self):
1116 return self.delta[2]
1118 @property
1119 def length(self):
1120 return self.delta_x
1122 @property
1123 def height(self):
1124 return self.delta_y
1126 @property
1127 def depth(self):
1128 return self.delta_z
1130 @property
1131 def boundaries(self) -> list:
1132 """
1133 Returns the boundaries of the cell.
1135 The format looks like:
1136 [-x, x, -y, y, -z, z]
1138 Returns
1139 -------
1140 list :
1141 The boundaries.
1142 """
1143 negative = (self.position - self.delta / 2).tolist()
1144 positive = (self.position + self.delta / 2).tolist()
1145 return [item for pair in zip(negative, positive) for item in pair]
1147 def update_vbox_geometry(self) -> None:
1148 """
1149 Update position/size (geometry) of the vbox (visualization). Call only if geometry changes.
1150 """
1151 self.vbox.pos = vector(*self.position)
1152 self.vbox.size = vector(*self.delta)
1154 def update_color(self, temperature: float, t_min: float = 263.15, t_max: float = 413.15,
1155 colormap="managua") -> None:
1156 """
1157 Update the color of the vbox for visualization.
1159 Parameters
1160 ----------
1161 temperature : float
1162 The temperature in Kelvin to set.
1163 t_min : float | int, optional
1164 The minimal temperature for the color code.
1165 t_max : float | int, optional
1166 The maximal temperature for the color code.
1167 colormap : str, optional
1168 The colormap to use. See pyrc.core.visualization.color.color.py or the txt files in there respectively.
1169 """
1170 assert t_max > t_min
1171 t_norm = (temperature - t_min) / (t_max - t_min)
1172 r, g, b = value_to_rgb(t_norm, colormap)
1173 self.vbox.color = vector(r, g, b)
1175 def _apply_alignment(self, alignment, reference_position, other_deltas):
1176 """
1177 Apply alignment string to calculate new position.
1179 Parameters
1180 ----------
1181 alignment : str
1182 Face alignment specification with optional pairing override.
1183 Format: Space-separated or consecutive axis specifications.
1184 Each specification: [self_dir][other_dir]axis
1185 where self_dir and other_dir are '+' or '-'.
1186 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1187 Examples: 'x' (default opposite), 'xy' (both default opposite),
1188 '+-x' (explicit opposite), '++x' (same face),
1189 '-y' (self -y with other +y), '+-x -+y' (multiple axes with space)
1190 reference_position : np.ndarray
1191 Starting position to update
1192 other_deltas : np.ndarray
1193 Delta values of object being placed
1195 Returns
1196 -------
1197 np.ndarray
1198 Updated position after applying alignment
1200 Raises
1201 ------
1202 ValueError
1203 If alignment string is malformed
1204 """
1205 new_position = reference_position.copy()
1207 # Remove spaces and parse character by character
1208 alignment_no_space = alignment.replace(' ', '')
1210 i = 0
1211 while i < len(alignment_no_space):
1212 self_direction = 1
1213 other_direction = -1
1214 signs = []
1216 # Parse signs (0, 1, or 2)
1217 while i < len(alignment_no_space) and alignment_no_space[i] in ['-', '+']:
1218 signs.append(-1 if alignment_no_space[i] == '-' else 1)
1219 i += 1
1220 if len(signs) > 2:
1221 raise ValueError(f"More than 2 signs found before axis at position {i}")
1223 # Parse axis
1224 if i < len(alignment_no_space) and alignment_no_space[i] in ['x', 'y', 'z']:
1225 axis = alignment_no_space[i]
1226 i += 1
1227 else:
1228 if signs:
1229 raise ValueError(f"Found direction signs without following axis at position {i}")
1230 break
1232 # Apply signs
1233 if len(signs) == 1:
1234 self_direction = signs[0]
1235 other_direction = -signs[0] # Opposite of self
1236 elif len(signs) == 2:
1237 self_direction = signs[0]
1238 other_direction = signs[1]
1239 # else: len(signs) == 0, use defaults (1, -1)
1241 axis_index = {'x': 0, 'y': 1, 'z': 2}[axis]
1242 new_position[axis_index] = (self.position[axis_index] +
1243 self_direction * self.delta[axis_index] / 2 -
1244 other_direction * other_deltas[axis_index] / 2)
1246 return new_position
1248 def place_adjacent(self, other_cell, alignment):
1249 """
1250 Place other_cell adjacent to self aligned at specified face(s).
1252 Parameters
1253 ----------
1254 other_cell : Cell
1255 Cell to be placed adjacent to self
1256 alignment : str
1257 Face alignment specification with optional pairing override.
1258 Format: Space-separated or consecutive axis specifications.
1259 Each specification: [self_dir][other_dir]axis
1260 where self_dir and other_dir are '+' or '-'.
1261 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1262 Examples: 'x' (default opposite), 'xy' (both default opposite),
1263 '+-x' (explicit opposite), '++x' (same face),
1264 '-y' (self -y with other +y), '+-x -+y' (multiple axes with space)
1265 """
1266 other_cell.position = self._apply_alignment(
1267 alignment,
1268 other_cell.position,
1269 other_cell.delta
1270 )
1272 return other_cell
1274 def create_adjacent(self, alignment, **kwargs):
1275 """
1276 Create and place new cell of same type adjacent to self.
1278 Parameters
1279 ----------
1280 alignment : str
1281 Face alignment specification with optional pairing override.
1282 Format: Space-separated or consecutive axis specifications.
1283 Each specification: [self_dir][other_dir]axis
1284 where self_dir and other_dir are '+' or '-'.
1285 Default pairing: opposite faces ('+x' pairs with '-x' of other)
1286 Examples:\n
1287 ``x`` (default opposite), ``xy`` (both default opposite),\n
1288 ``+-x`` (explicit opposite), ``++x`` (same face),\n
1289 ``-y`` (self -y with other +y), ``+-x -+y`` (multiple axes with space)
1290 **kwargs
1291 Arguments passed to constructor (``delta`` etc.)
1293 Returns
1294 -------
1295 Cell or subclass :
1296 New `Cell` of same type as ``self`` placed adjacent to ``self``
1297 """
1298 if "position" not in kwargs:
1299 kwargs["position"] = self.position.copy()
1300 new_cell = type(self)(**kwargs)
1301 return self.place_adjacent(new_cell, alignment)
1303 @classmethod
1304 def create_grid(cls,
1305 grid_size,
1306 delta: np.ndarray | tuple = None,
1307 center_position=None,
1308 **kwargs) -> np.ndarray:
1309 """
1310 Create a 3D grid of cells.
1312 Parameters
1313 ----------
1314 grid_size : tuple of int
1315 Number of cells (nx, ny, nz).
1316 delta : np.ndarray | tuple
1317 Total dimensions in one vector.
1318 If single delta-values are given, too, the delta vector is used.
1319 delta : float
1320 Total length in x,y,z direction.
1321 center_position : np.ndarray, optional
1322 Center position of the grid. Defaults to origin.
1323 **kwargs
1324 Additional arguments passed to constructor.
1326 Returns
1327 -------
1328 np.ndarray
1329 3D array of shape (nx, ny, nz) containing Cell instances.
1330 """
1332 nx, ny, nz = grid_size
1333 cell_deltas = delta / np.array([nx, ny, nz])
1334 center = np.zeros(3) if center_position is None else center_position.copy()
1336 cells = np.empty((nx, ny, nz), dtype=object)
1337 for ix in range(nx):
1338 for iy in range(ny):
1339 for iz in range(nz):
1340 offset = (np.array([ix, iy, iz]) - (np.array([nx, ny, nz]) - 1) / 2) * cell_deltas
1341 cells[ix, iy, iz] = cls(
1342 position=center + offset,
1343 delta=cell_deltas,
1344 **kwargs
1345 )
1346 return cells
1348 def create_grid_aligned(self,
1349 alignment,
1350 grid_size,
1351 total_delta,
1352 position=None,
1353 **kwargs) -> np.ndarray[Cell]:
1354 """
1355 Create a 3D grid of cells aligned to self.
1357 Parameters
1358 ----------
1359 alignment : str
1360 Face alignment specification. See _apply_alignment for format.
1361 grid_size : tuple of int
1362 Number of cells (nx, ny, nz).
1363 total_delta : float
1364 Total length in x,y,z direction.
1365 position : np.ndarray, optional
1366 Base position, updated by alignment. Defaults to origin.
1367 **kwargs
1368 Additional arguments passed to constructor.
1370 Returns
1371 -------
1372 np.ndarray
1373 3D array of shape (nx, ny, nz) containing Cell instances.
1374 """
1375 base = np.zeros(3) if position is None else position.copy()
1376 total_deltas = np.array(total_delta)
1377 center = self._apply_alignment(alignment, base, total_deltas)
1378 return type(self).create_grid(
1379 grid_size, total_delta,
1380 center_position=center, **kwargs
1381 )
1384class Material:
1386 def __init__(self,
1387 name,
1388 density: float | int | np.number = np.nan,
1389 heat_capacity: float | int | np.number = np.nan,
1390 thermal_conductivity: float | int | np.number = np.nan,
1391 ):
1392 """
1393 Container to hold all material properties.
1395 Parameters
1396 ----------
1397 name : str
1398 The name/identifier of the material.
1399 density : float | int | np.number
1400 The density of the material in kg/m^3.
1401 heat_capacity : float | int | np.number
1402 The heat capacity of the material in J/kg/K.
1403 thermal_conductivity : float | int | np.number
1404 The thermal conductivity of the material in W/m/K.
1405 """
1406 self.__name = name
1407 self.__density = density
1408 self.__heat_capacity = heat_capacity
1409 self.__thermal_conductivity = thermal_conductivity
1411 def __new__(cls, *args, **kwargs):
1412 """
1413 This blocks the creation of instances from this class because it should only be used the children of self.
1415 Parameters
1416 ----------
1417 args
1418 kwargs
1419 """
1420 if cls is Material:
1421 children = [sub_cls.__name__ for sub_cls in Material.__subclasses__()]
1422 raise TypeError(f"Cannot instantiate {cls.__name__} directly. Use the children: {', '.join(children)}")
1423 return super().__new__(cls)
1425 @property
1426 def name(self):
1427 return self.__name
1429 @property
1430 def density(self):
1431 return self.__density
1433 @property
1434 def heat_capacity(self):
1435 return self.__heat_capacity
1437 @property
1438 def thermal_conductivity(self):
1439 return self.__thermal_conductivity
1442class Fluid(Material):
1444 def __init__(
1445 self,
1446 *args,
1447 kin_viscosity: float | int | np.number = np.nan,
1448 prandtl_number: float | int | np.number = np.nan,
1449 grashof_number: float | int | np.number = np.nan,
1450 **kwargs,
1451 ):
1452 """
1454 Parameters
1455 ----------
1456 args
1457 kin_viscosity : float | int | np.number
1458 The kinematic viscosity of the material in m^2/s.
1459 prandtl_number : float | int | np.number
1460 The Prandtl number of the material without unit.
1461 grashof_number : float | int | np.number
1462 The Grashof number of the material without unit.
1463 kwargs
1464 """
1465 super().__init__(*args, **kwargs)
1466 self.__kin_viscosity = kin_viscosity
1467 self.__prandtl_number = prandtl_number
1468 self.__grashof_number = grashof_number
1470 @property
1471 def kin_viscosity(self):
1472 return self.__kin_viscosity
1474 @property
1475 def prandtl_number(self):
1476 if self.__prandtl_number is None or not is_set(self.__prandtl_number):
1477 self.__prandtl_number = self.kin_viscosity * self.density * self.heat_capacity / self.thermal_conductivity
1478 return self.__prandtl_number
1480 @property
1481 def Pr(self):
1482 return self.prandtl_number
1484 @property
1485 def grashof_number(self):
1486 return self.__grashof_number
1488 @property
1489 def Gr(self):
1490 return self.__grashof_number
1492 @property
1493 def rayleigh_number(self):
1494 return self.grashof_number * self.prandtl_number
1497class Solid(Material):
1498 def __init__(self, *args, **kwargs):
1499 super().__init__(*args, **kwargs)
1502class CompositeMaterialSolid(Solid):
1503 """
1504 Combine multiple materials using ratios.
1505 """
1507 def __init__(self, name, materials, ratios, **kwargs):
1508 total_ratio = sum(ratios)
1509 weights = [ratio / total_ratio for ratio in ratios]
1511 density = sum(mat.density * weight for mat, weight in zip(materials, weights))
1512 heat_capacity = sum(mat.heat_capacity * weight for mat, weight in zip(materials, weights))
1513 thermal_conductivity = sum(mat.thermal_conductivity * weight for mat, weight in zip(materials, weights))
1515 solid_kwargs = {
1516 "name": name,
1517 "density": density,
1518 "heat_capacity": heat_capacity,
1519 "thermal_conductivity": thermal_conductivity
1520 }
1521 solid_kwargs.update(kwargs)
1522 super().__init__(
1523 **solid_kwargs
1524 )
1527class CompositeMaterialFluid(Fluid):
1528 """
1529 Combine multiple fluid materials using ratios.
1530 """
1532 def __init__(self, name, materials, ratios, **kwargs):
1533 total_ratio = sum(ratios)
1534 weights = [ratio / total_ratio for ratio in ratios]
1536 density = sum(mat.density * weight for mat, weight in zip(materials, weights))
1537 heat_capacity = sum(mat.heat_capacity * weight for mat, weight in zip(materials, weights))
1538 thermal_conductivity = sum(mat.thermal_conductivity * weight for mat, weight in zip(materials, weights))
1539 kin_viscosity = sum(mat.kin_viscosity * weight for mat, weight in zip(materials, weights))
1541 fluid_kwargs = {
1542 "name": name,
1543 "density": density,
1544 "heat_capacity": heat_capacity,
1545 "thermal_conductivity": thermal_conductivity,
1546 "kin_viscosity": kin_viscosity
1547 }
1548 fluid_kwargs.update(kwargs)
1549 super().__init__(
1550 **fluid_kwargs
1551 )
1554def calculate_balance_for_resistors(node, resistors: list[MassTransport]):
1555 balance = 0
1556 for resistor in resistors:
1557 if resistor.sink == node:
1558 # resistor is source for node
1559 balance += resistor.volume_flow
1560 else:
1561 # resistor is sink for node
1562 balance -= resistor.volume_flow
1563 return balance
1566class EquationItemInput(Input, EquationItem, ABC): pass