Coverage for pyrc \ core \ nodes.py: 29%
408 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8from __future__ import annotations
10import warnings
12import numpy as np
13from collections import deque
14from typing import TYPE_CHECKING
15from abc import ABC
17from pyrc.core.components.capacitor import Capacitor
18from pyrc.core.inputs import BoundaryCondition, FlowBoundaryCondition, InternalHeatSource
19from pyrc.core.materials import Air
20from pyrc.core.components.templates import (
21 Material,
22 ObjectWithPorts,
23 Cell,
24 Geometric,
25 ConnectedFlowObject,
26 calculate_balance_for_resistors,
27 initial_rc_objects,
28 RCObjects,
29 RCSolution,
30 solution_object,
31 Fluid,
32 Solid,
33)
35if TYPE_CHECKING:
36 from pyrc.core.resistors import MassTransport
37 from pyrc.core.settings import Settings
38 from pyrc.core.components.resistor import Resistor
41class ConnectedFlowCapacitorObject(Capacitor, ConnectedFlowObject, ABC):
42 pass
45class Node(Capacitor, Cell):
46 def __init__(
47 self,
48 material: Material,
49 temperature: float | int | np.number,
50 position: np.ndarray | tuple,
51 temperature_derivative: float | int | np.number = 0,
52 internal_heat_source: bool | int | float | np.number = False,
53 internal_heat_source_type=InternalHeatSource,
54 delta: np.ndarray | tuple | list = None,
55 rc_objects=initial_rc_objects,
56 rc_solution: RCSolution = solution_object,
57 **kwargs: float | int | np.ndarray | Settings,
58 ):
59 """
61 Parameters
62 ----------
63 material : Material
64 Container with all material properties stored in the node is made up from.
65 temperature : float | int | np.number
66 The temperature of the node.
67 It is recommended to use the SI unit Kelvin instead of degrees Celsius.
68 position : np.ndarray
69 The position of the node in 2D/3D space.
70 If 2D, a zero is added for the z coordinate.
71 temperature_derivative : float | int | np.number, optional
72 The temperature derivative of the node.
73 internal_heat_source : bool | int | float | np.number, optional
74 If True, an internal heat source term is added.
75 It can be accessed using Node.internal_heat_source
76 If a number, the number is used to set as volumetric heat source power in W/m**3.
77 internal_heat_source_type : type, optional
78 The internal heat source type.
79 Can be each `InternalHeatSource` type.
80 delta : np.ndarray | tuple, optional
81 Delta vector [delta_x, delta_y, delta_z].
82 kwargs : dict
83 Parameters passed to `InternalHeatSource`/its type.
85 Notes
86 -----
87 Be aware of the fact that the **Node's properties can change during network creation!**
88 This means, that if you add connections to a Node it's volume/mass can change. So every object, that needs
89 the Node's volume/mass should be initialized **after** the whole network was build up. This affects,
90 among other things, `Input` s like `InternalHeatSource` s, especially `Radiation`.
91 To avoid this, you must pass the whole object (node/BC) to the Input and program it in a way that the values
92 are created earliest when values are requested (and therefore the network must be set up (hopefully)).
93 """
94 Capacitor.__init__(
95 self,
96 capacity=None,
97 temperature=temperature,
98 temperature_derivative=temperature_derivative,
99 rc_objects=rc_objects,
100 rc_solution=rc_solution,
101 )
102 Cell.__init__(self, position=position, delta=delta)
103 self.material: Material | Fluid | Solid = material
104 self._volume = None
105 self._kwargs = kwargs # saved for __copy__
107 # This must be at the end of init. It uses some attributes of node already (like node.id)
108 if internal_heat_source:
109 if not isinstance(internal_heat_source, bool):
110 internal_heat_source = internal_heat_source_type(
111 node=self, specific_power_in_w_per_cubic_meter=internal_heat_source, **kwargs
112 )
113 elif internal_heat_source_type is not None:
114 internal_heat_source = internal_heat_source_type(node=self, **kwargs)
115 else:
116 internal_heat_source = InternalHeatSource(node=self, **kwargs)
117 self.add_internal_heat_source(internal_heat_source)
119 def __copy__(self):
120 # TODO: This was not checked well. Have a closer look on it.
121 cls = self.__class__
122 internal_heat_source = False if self.internal_heat_source is None else True
123 internal_heat_source_type = InternalHeatSource
124 if internal_heat_source:
125 internal_heat_source_type = type(self.internal_heat_source)
126 if isinstance(self.internal_heat_source, InternalHeatSource):
127 Warning(
128 "A Node with an InternalHeatSource is copied. The attribute\n"
129 "\tspecific_power_in_w_per_meter_squared\n"
130 "might be lost."
131 )
132 if hasattr(self.internal_heat_source, "volume_specific_power"):
133 internal_heat_source = self.internal_heat_source.volume_specific_power
134 return cls(
135 material=self.material,
136 temperature=self.temperature,
137 position=self.position,
138 temperature_derivative=self.temperature_derivative,
139 internal_heat_source=internal_heat_source,
140 internal_heat_source_type=internal_heat_source_type,
141 delta=self.delta,
142 rc_objects=self.rc_objects,
143 rc_solution=self.solutions,
144 **self._kwargs,
145 )
147 def __str__(self):
148 return self.__repr__()
150 def __repr__(self):
151 return f"{self.__class__.__name__} {self.id}: {self.material.__class__.__name__}; ϑ={self.temperature}"
153 @property
154 def capacity(self) -> float | int:
155 """
156 Returns the capacity (not specific) of the node in J/K.
158 Returns
159 -------
160 float | int :
161 The capacity of the node in J/K.
162 """
163 if self._capacity is None:
164 self.calculate_capacity()
165 return self._capacity
167 @property
168 def volume(self) -> float | int:
169 if self._volume is None:
170 self.calculate_volume()
171 return self._volume
173 @property
174 def mass(self) -> float | int | np.number:
175 """
176 Returns the mass flow of the node in kg.
178 Returns
179 -------
180 float | int :
181 The mass flow of the node in kg.
182 If the density isn't defined np.nan is returned.
183 """
184 if self.material.density:
185 return self.volume * self.material.density
186 return np.nan
188 def update_color(self, t_min: float = 263.15, t_max: float = 413.15, colormap="managua") -> None:
189 """
190 Update the color of the vbox for visualization.
192 Parameters
193 ----------
194 t_min : float | int, optional
195 The minimal temperature for the color code.
196 t_max : float | int, optional
197 The maximal temperature for the color code.
198 colormap : str, optional
199 The colormap to use. See pyrc.core.visualization.color.color.py or the txt files in there respectively.
200 """
201 Cell.update_color(self.temperature, t_min=t_min, t_max=t_max, colormap=colormap)
203 def area(self, normal: np.ndarray) -> float:
204 """
205 Returns the area of the desired plane in m**2.
207 Parameters
208 ----------
209 normal : np.ndarray
210 The direction to the wanted plane.
211 E.g.: if (1,0,0) the y-z-plane is wanted so the area of the yz-plane is returned.
213 Returns
214 -------
215 float :
216 The area in m^2.
217 """
218 direction_abs = np.abs(normal)
220 multiplication_vector = np.array([1, 1, 1]) - direction_abs
221 area = np.prod(self.delta, where=multiplication_vector.astype(bool))
222 return float(area.item())
224 def calculate_capacity(self):
225 self._capacity = self.material.heat_capacity * self.mass # J/K
227 def calculate_volume(self):
228 self._volume = self.delta_x * self.delta_y * self.delta_z # m^3
230 # add volume for each channel node
231 channel_nodes = self.get_connected_nodes(ChannelNode)
232 channel_node: ChannelNode
233 for channel_node in channel_nodes:
234 raw_volume = channel_node.delta_x * channel_node.delta_y * channel_node.delta_z
235 solid_volume = raw_volume - channel_node.volume
236 self._volume += solid_volume / 4
238 @property
239 def values(self) -> list:
240 result = [*super().values_without_capacity, self.capacity]
241 return result
243 def get_area(self, asking_node: Node | Cell | BoundaryCondition | Capacitor) -> float | int | np.number:
244 """
245 Returns the area that the asking node sees. Through this area travels the heat between both nodes.
247 Example: If the asking node has the same position in x and y direction but is shifted in z direction,
248 the area of the cell face X-Y is returned.
250 A non-Cell class is only valid if its direction to/from self was set manually in
251 self.manual_directions[asking_node]. Otherwise, it will throw an error.
253 Parameters
254 ----------
255 asking_node : Node | Cell | BoundaryCondition | Capacitor
256 The Node that asks for the area of this Node. It is used to calculate the shift in position and to return
257 the associated surface area accordingly.
258 If it doesn't inherit from Cell, it is assumed that the direction is set manually in manual_directions dict
259 using ``self`` as key.
261 Returns
262 -------
263 float | int | np.number :
264 The area of the surface of the cell that has the normal vector of the line you get connecting both `self`
265 and `asking_node`.
266 """
267 # determine the shift in position between self and asking_node
268 direction_to_asking_node = self.get_direction(asking_node=asking_node)
270 return self.area(direction_to_asking_node)
272 def get_conduction_length(self, asking_node: Node | ConnectedFlowObject | Geometric | Capacitor):
273 """
274 Returns the length to the asking node. It's half the delta value in direction to the asking node.
276 Parameters
277 ----------
278 asking_node : Geometric
279 The Node asking for the (half) length between itself and self.
281 Returns
282 -------
283 float :
284 The half-length of self. Or: Half the delta value of self pointing towards asking_node.
285 """
286 direction = self.get_direction(asking_node) # returns the direction to (not from) the asking node.
288 return np.linalg.norm(direction * self.delta) * 0.5
290 def get_both_perpendicular_lengths(self, asking_node: Geometric | Node | BoundaryCondition) -> tuple[float, float]:
291 """
292 Returns both perpendicular lengths between the self and the asking node.
294 Parameters
295 ----------
296 asking_node : Geometric | Node | BoundaryCondition
297 The node asking.
299 Returns
300 -------
301 tuple[float, float] :
302 Both length in no particular order.
303 """
304 # get the direction to the asking node
305 direction = self.get_direction(asking_node)
307 # Define the mapping of axes
308 axis_map = {
309 (1, 0, 0): ["delta_y", "delta_z"], # If direction is X, take Y and Z
310 (0, 1, 0): ["delta_x", "delta_z"], # If direction is Y, take X and Z
311 (0, 0, 1): ["delta_x", "delta_y"], # If direction is Z, take X and Y
312 }
314 # Convert direction to tuple
315 direction_tuple = tuple(np.abs(direction).astype(int))
317 if direction_tuple not in axis_map:
318 return np.nan, np.nan # Shouldn't happen if input is correct
320 # Extract the two perpendicular delta values
321 delta_1 = getattr(self, axis_map[direction_tuple][0])
322 delta_2 = getattr(self, axis_map[direction_tuple][1])
324 return delta_1, delta_2
326 def get_smaller_edge(self, asking_node: Geometric | Node | BoundaryCondition | Capacitor):
327 """
328 Returns the smaller edge of the cell using the asking node to set the direction.
330 Used to determine the diameter in `MassFlowNode` s.
332 Parameters
333 ----------
334 asking_node : Geometric | Node | BoundaryCondition
335 The node that asks for the diameter of this `Node`. It determines the direction to get the right dimension.
337 Returns
338 -------
339 float | int | np.number :
340 The diameter (smallest length of the cell side pointing towards `asking_node`).
341 """
342 deltas = self.get_both_perpendicular_lengths(asking_node=asking_node)
344 # Return the smaller of the two
345 return min(deltas)
348class MassFlowNode(Node, ConnectedFlowObject):
349 def __init__(
350 self,
351 material: Material,
352 temperature,
353 position,
354 flow_area: float | int | str = None,
355 flow_length: float | int | str = None,
356 temperature_derivative=0,
357 flow_velocity=None,
358 mass_flow=None,
359 volume_flow=None,
360 rc_objects: RCObjects = initial_rc_objects,
361 rc_solution: RCSolution = solution_object,
362 **kwargs,
363 ):
364 """
366 Parameters
367 ----------
368 temperature : float | int | np.number
369 The (initial) temperature of the node.
370 position : np.ndarray
371 The position of the node in 2D/3D space.
372 If 2D, a zero is added for the z coordinate.
373 flow_area : float | int | str
374 Defining the area that is used for flow calculations (determining the velocity and the Courant number).
375 Can be a string:
376 xy: xy-plane, xz: xz-plane, yz: yz-plane
377 If not given the smallest flow area is used that points to a `ConnectedFlowObject`.
378 flow_length : float | int | str
379 Defining the length that the flow takes.
380 Can be a string:
381 x: delta_x, y: delta_y, z: delta_z
382 If not given the matching one for the flow area is used.
383 temperature_derivative : float | int | np.number
384 The temperature derivative of the node.
385 flow_velocity : float | int | np.number
386 The flow velocity of the node.
387 It doesn't matter which sign it has. Only the absolute value is used.
388 """
389 ConnectedFlowObject.__init__(self)
390 Node.__init__(
391 self,
392 material=material,
393 temperature=temperature,
394 position=position,
395 temperature_derivative=temperature_derivative,
396 rc_objects=rc_objects,
397 rc_solution=rc_solution,
398 **kwargs,
399 )
400 # Compute area if it's given as a plane
401 self._flow_length = None
402 self._flow_area = None
403 if isinstance(flow_length, str):
404 self._flow_length = getattr(self, f"delta_{flow_length}")
405 else:
406 self._flow_length = flow_length
407 if isinstance(flow_area, str):
408 match flow_area.lower():
409 case "xy" | "yx":
410 self._flow_area = self.delta_x * self.delta_y
411 self._flow_length = self.delta_z
412 case "xz" | "zx":
413 self._flow_area = self.delta_x * self.delta_z
414 self._flow_length = self.delta_y
415 case "yz" | "zy":
416 self._flow_area = self.delta_y * self.delta_z
417 self._flow_length = self.delta_x
418 case _:
419 raise ValueError("Invalid plane. Use 'xy', 'xz', or 'yz'.")
420 else:
421 self._flow_area = flow_area # Assume it's given as a direct value in m².
422 # TODO: set flow_area and flow_direction depending on each other.
424 self._flow_propagation = False
426 if sum(val is not None for val in [flow_velocity, mass_flow, volume_flow]) != 1:
427 self._flow_propagation = True
429 self._passed_flow_velocity = flow_velocity
431 if volume_flow is not None:
432 self._volume_flow = volume_flow
433 elif flow_velocity is not None:
434 # calculate later because the area isn't known until all connections were made
435 self._volume_flow = None
436 elif mass_flow is not None:
437 self._volume_flow = mass_flow / self.material.density
439 # Initialize cache
440 self._velocity = None
441 self._mass_flow = None
442 self._flow_direction = None
444 def courant_number(self, time_step: float):
445 """
446 Calculates the courant number of the node.
448 Parameters
449 ----------
450 time_step : float | int | np.number
451 The (maximum) time step used for calculation.
453 Returns
454 -------
455 float | np.number :
456 The courant number.
457 """
458 return self.velocity * time_step / self.flow_length
460 @property
461 def flow_length(self) -> float:
462 """
463 The effective length of the channel. Use only if no asking_node is given!
465 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to
466 get the perpendicular direction and define which sides defining the length.
468 Returns
469 -------
470 float :
471 The length, but it can be wrong in few cases.
472 """
473 if self._flow_length is None:
474 # request flow_area and calculate the value in it
475 _ = self.flow_area
476 return self._flow_length
478 @property
479 def flow_area(self):
480 if self._flow_area is None or self._flow_length is None:
481 # get the areas between self and ConnectedFlowObjects and take the smallest one.
482 result = np.inf
483 flow_length = 0
484 flow_obj: MassFlowNode | BoundaryCondition
485 for flow_obj in self.mass_flow_neighbours:
486 area = self.get_area(flow_obj)
487 if result > area:
488 result = area
489 flow_length = self.get_direction(flow_obj) * self.delta
490 self._flow_area = result
491 self._flow_length = np.linalg.norm(flow_length)
492 return self._flow_area
494 @property
495 def volume_flow(self):
496 """Returns the cached volume flow rate (m³/s)."""
497 if self._flow_propagation and self._volume_flow is None:
498 self.propagate_flow()
499 # volume flow should be set now
500 self._flow_propagation = False
501 if self._volume_flow is None:
502 if self._passed_flow_velocity:
503 self._volume_flow = self._passed_flow_velocity * self.flow_area
504 else:
505 raise ValueError("This shouldn't have happened!")
506 return self._volume_flow
508 @property
509 def velocity(self):
510 """Returns the cached flow velocity (m/s) or calculates it if needed."""
511 if self._velocity is None:
512 self._velocity = self.volume_flow / self.flow_area
513 return self._velocity
515 @property
516 def mass_flow(self):
517 """Returns the cached mass flow rate (kg/s) or calculates it if needed."""
518 if self._mass_flow is None:
519 self._mass_flow = self.volume_flow * self.material.density
520 return self._mass_flow
522 def reset_properties(self):
523 """Clears cached flow velocity and mass flow to force recalculation."""
524 super().reset_properties()
525 self._velocity = None
526 self._mass_flow = None
528 @property
529 def sources(self) -> list[Capacitor | Node]:
530 """
531 Returns a list with all nodes where volume flow is flowing into the node.
533 No check is performed which node type is put into the list, but it should only be `MassFlowNode` s and its
534 descendants or `FlowBoundaryCondition` s.
536 Returns
537 -------
538 list :
539 All nodes serving as volume flow source.
540 """
541 result = []
542 seen = set()
543 resistor: MassTransport
544 for resistor in self.connected_mass_transport_resistors():
545 if resistor.sink == self:
546 connected = resistor.get_connected_node(self)
547 if connected not in seen:
548 seen.add(connected)
549 result.append(connected)
550 return result
552 @property
553 def sinks(self) -> list[MassFlowNode | FlowBoundaryCondition]:
554 """
555 Returns a list of all `MassFlowNode` s into which the flow is flowing.
557 No check is performed which node type is put into the list, but it should only be `MassFlowNode` s and its
558 descendants or `FlowBoundaryCondition` s.
560 Returns
561 -------
562 list :
563 All nodes serving as volume flow sink.
564 """
565 result = []
566 seen = set()
567 resistor: MassTransport
568 for resistor in self.connected_mass_transport_resistors():
569 if resistor.source == self:
570 connected = resistor.get_connected_node(self)
571 if connected not in seen:
572 seen.add(connected)
573 result.append(connected)
574 return result
576 @property
577 def diameter(self) -> float:
578 """
579 Returns the hydraulic diameter.
581 Returns
582 -------
583 float :
584 The hydraulic diameter of the node.
585 """
586 len1, len2 = self.get_minimal_perpendicular_lengths_to_flow()
587 len1 = float(len1)
588 len2 = float(len2)
589 # return hydraulic diameter
590 return 2 * len1 * len2 / (len1 + len2)
592 def get_resistor_volume_flow(self, resistor: MassTransport):
593 """
594 Returns the volume flow for the given resistor using mass balance.
596 Parameters
597 ----------
598 resistor : MassTransport
599 The `Resistor` to determine the volume flow for.
601 Returns
602 -------
603 float :
604 The volume flow of the given `Resistor`.
605 """
606 result = 0
607 other_node_on_resistor = resistor.get_connected_node(self)
608 for sink in self.sinks:
609 if sink != other_node_on_resistor:
610 result -= sink.volume_flow
611 source: ConnectedFlowObject
612 for source in self.sources:
613 if source != other_node_on_resistor:
614 result += source.volume_flow
616 # if other_node_on_resistor is in sinks, the volume flow is positive (if it flows from source to sink) and
617 # vice versa: if in sources, the volume flow is negative
618 if other_node_on_resistor in self.sources:
619 result *= -1
620 return result
622 @property
623 def start_nodes(self) -> list:
624 """
625 Returns all nodes that are defined as a start of the mass flow.
627 Used for the algorithm to set the direction of the mass flow (make_velocity_direction).
629 Returns
630 -------
631 list :
632 A list with all mass flow start BoundaryConditions.
633 """
634 result = []
635 element: ObjectWithPorts
636 for element in self.rc_objects.all:
637 if isinstance(element, BoundaryCondition):
638 if element.is_mass_flow_start:
639 result.append(element)
640 return result
642 @property
643 def mass_flow_neighbours(self):
644 """
645 Returns all neighbour `MassFlowNode` s.
647 Returns
648 -------
649 list :
650 The list with all neighbour `MassFlowNode` s
651 """
652 return [*self.sinks, *self.sources]
654 @property
655 def balance(self):
656 return calculate_balance_for_resistors(self, self.connected_mass_transport_resistors())
658 def check_balance(self) -> bool:
659 """
660 Returns True if every parent `ConnectedFlowObject` has a balanced volume flow.
662 This implies that the algorithm to set the volume flow always walks from the flow start to its end and not
663 the other way around.
665 Returns
666 -------
667 bool :
668 True if every parent `ConnectedFlowObject` has a balanced volume flow. False otherwise.
669 """
670 if self.volume_flow_is_balanced:
671 return True
672 # walk the flow upwards and check every source if they are balanced.
673 source: ConnectedFlowObject
674 for source in self.sources:
675 if source.check_balance():
676 continue
677 else:
678 return False
679 # check if self is balanced
680 balance = self.balance
682 if np.isclose(balance, 0):
683 self.volume_flow_is_balanced = True
684 return True
685 return False
687 @property
688 def flow_direction(self) -> list[np.ndarray]:
689 result = []
690 if self._flow_direction is None:
691 flow_direction = np.array((0, 0, 0))
692 for neighbour in self.mass_flow_neighbours:
693 flow_direction += np.abs(neighbour.get_direction(self))
694 for i, value in enumerate(flow_direction):
695 if value > 0:
696 result.append(np.zeros(3))
697 result[-1][i] = 1
698 self._flow_direction = flow_direction
699 return self._flow_direction
701 def get_minimal_perpendicular_lengths_to_flow(self) -> tuple:
702 flow_direction: list = self.flow_direction
704 if len(flow_direction) > 1:
705 minima = np.partition(self.delta, 1)[0:2]
706 return minima[0], minima[1]
708 mask_vector = np.array([1, 1, 1]) - np.abs(flow_direction[0])
710 masked_array = np.ma.array(self.delta, mask=mask_vector)
711 # invert mask of masked array
712 masked_array = np.ma.array(masked_array.data, mask=~masked_array.mask)
713 return tuple(masked_array.compressed())
715 def get_perpendicular_length_parallel_to_flow(self, asking_node: Node):
716 """
717 Returns the length that is perpendicular to the connection asking_node - self and is parallel to the flow.
719 Used for the effective area between a Solid and a MassFlowNode as well as to get the length of ChannelNodes.
721 Parameters
722 ----------
723 asking_node : Node
725 Returns
726 -------
727 float :
728 The length.
729 """
730 first_vector = self.mass_flow_neighbours[0].get_direction(self)
732 second_vector = self.get_direction(asking_node)
734 # make cross product to get the third vector
735 third_vector = np.abs(np.cross(first_vector, second_vector))
737 return float((third_vector.reshape(1, 3) @ self.delta.reshape(3, 1)).item())
739 def connected_mass_transport_resistors(self) -> list[MassTransport]:
740 """
741 Returns all connected mass transport resistors.
743 Returns
744 -------
745 list[MassTransport] :
746 The list.
747 """
748 from pyrc.core.resistors import MassTransport
750 result = []
751 for neighbour in self.neighbours:
752 if isinstance(neighbour, MassTransport):
753 result.append(neighbour)
754 return result
756 @property
757 def mass_transport_resistors(self) -> list:
758 """
759 Returns a list with all MassTransport resistors in the network.
761 Returns
762 -------
763 list :
764 The list with all MassTransport resistors in the network.
765 """
766 from pyrc.core.resistors import MassTransport
768 return self.rc_objects.get_all_objects(MassTransport)
770 @property
771 def mass_flow_nodes(self) -> list:
772 """
773 Returns a list with all MassTransport resistors in the network.
775 Returns
776 -------
777 list :
778 The list with all MassTransport resistors in the network.
779 """
780 from pyrc.core.resistors import MassFlowNode
782 return self.rc_objects.get_all_objects(MassFlowNode)
784 # def make_velocity_direction(self):
785 # """
786 # Determines and sets the source and sink for each resistor in a network using iterative DFS.
787 # """
788 # from pyrc.core.resistors import MassTransport
789 # resistors: list[MassTransport] = self.mass_transport_resistors
790 # start_nodes = self.start_nodes
791 #
792 # graph = {}
793 # resistor_map = {} # Maps node pairs (unordered) to resistors
794 #
795 # # Step 1: Build adjacency list and resistor mapping
796 # for resistor in resistors:
797 # u, v = resistor.neighbours # Each resistor connects exactly two nodes
798 # key = frozenset((u, v)) # Store as an unordered key
799 #
800 # graph[u].append(v)
801 # graph[v].append(u)
802 # resistor_map[key] = resistor # Single entry instead of two
803 #
804 # # Step 2: Start iterative DFS from each start node
805 # for start_node in start_nodes:
806 # stack = [(start_node, None)]
807 # visited = set()
808 #
809 # while stack:
810 # node, parent = stack.pop()
811 # if node in visited:
812 # continue
813 # visited.add(node)
814 #
815 # for neighbor in graph[node]:
816 # if neighbor == parent:
817 # continue
818 # key = frozenset((node, neighbor))
819 # resistor = resistor_map[key]
820 # resistor.source = node
821 # resistor.sink = neighbor
822 # stack.append((neighbor, node))
823 #
824 # # Ensure all resistors have source and sink set
825 # for resistor in resistors:
826 # if resistor.source is None or resistor.sink is None:
827 # raise ValueError(
828 # f"Resistor between {resistor.neighbours[0]} and {resistor.neighbours[1]} is not properly set.")
830 def propagate_flow(self):
831 """
832 Propagates the volume flow through all connected `ConnectedFlowObject` s from start to end.
834 It considers already set volume flows on its way to the end, but until now there is now backflow allowed. So
835 if the set volume flow of an sink exceeds the balance it will be overwritten.
837 Only works if the volume flow is set at the start `FlowBoundaryCondition`! It walks from the start to the end
838 and not the other way round.
839 """
840 start_nodes = self.start_nodes
841 queue = deque(start_nodes)
842 considered = set(start_nodes)
844 while queue:
845 node: ConnectedFlowCapacitorObject = queue.popleft()
847 if node.guess_volume_flow is None:
848 warnings.warn("That shouldn't have happened.")
849 continue # Skip nodes without a defined volume flow
850 node.volume_flow_is_balanced = True
852 sink_nodes: list[ConnectedFlowObject] = [n for n in node.sinks if n not in considered]
853 if not sink_nodes:
854 # dead end.
855 continue
857 defined_sinks = [n for n in sink_nodes if n.volume_flow_is_balanced]
858 undefined_sinks = [n for n in sink_nodes if not n.volume_flow_is_balanced]
860 if len(defined_sinks) > 0:
861 total_defined_flow = sum(n.guess_volume_flow for n in defined_sinks)
862 remaining_flow = node.guess_volume_flow - total_defined_flow
863 if np.isclose(remaining_flow, 0):
864 remaining_flow = 0
865 if remaining_flow < 0:
866 # overwrite all incorrect sink volume flows with equal distribution
867 # TODO: Maybe allow negative volume flows (back-flows)?
868 print("Warning: Set volume flow is overwritten to create no backflow.")
869 undefined_sinks.extend(defined_sinks)
870 defined_sinks = []
871 for sink in defined_sinks:
872 if sink not in considered:
873 # TODO: check if sources are all having a defined volume flow (this would save circle circuits)
874 add_sink_to_queue = True
875 for source in sink.sources:
876 if source.volume_flow_is_balanced:
877 continue
878 else:
879 add_sink_to_queue = False
880 break
881 if add_sink_to_queue:
882 queue.append(sink)
883 considered.add(sink)
884 else:
885 remaining_flow = node.guess_volume_flow
887 if len(undefined_sinks) > 0:
888 equal_flow = remaining_flow / len(undefined_sinks)
889 for sink in undefined_sinks:
890 if sink not in considered:
891 if sink.guess_volume_flow is None:
892 sink._volume_flow = 0 # Initialize if undefined
894 sink._volume_flow += equal_flow
895 # set volume flow in resistor in between
896 resistor = node.get_mass_transport_to_node(sink)
897 resistor._volume_flow = equal_flow
899 if all(s.volume_flow_is_balanced for s in sink.sources):
900 queue.append(sink)
901 considered.add(sink)
903 self.set_remaining_resistor_volume_flow()
905 def set_remaining_resistor_volume_flow(self):
906 """
907 Sets all unset volume flows of all `MassTransport` `Resistors`.
909 The most of them should have been set already in `self.propagate_flow()`.
911 """
912 resistors = [r for r in self.mass_transport_resistors if r.guess_volume_flow is None]
913 max_attempts = len(resistors)
914 queue = deque(resistors)
915 attempts = {resistor: 0 for resistor in resistors} # keep track how often the resistor is re-added to the queue
917 resistor: MassTransport
918 while queue:
919 resistor = queue.popleft()
921 node: ConnectedFlowCapacitorObject
922 for node in resistor.neighbours:
923 # First: check if node only has two resistors connected. If so, set volume_flow of node as own one.
924 if len(node.sinks) == 1 and len(node.sources) == 1:
925 resistor._volume_flow = node.volume_flow
926 break # go to next resistor
927 else: # Only executed if the first loop did not break
928 node: ConnectedFlowCapacitorObject
929 from pyrc.core.resistors import MassTransport
931 for node in resistor.neighbours:
932 # Determine the volume_flow balance of the node using the resistor.guess_volume_flow
933 # At least 'resistor' has no volume_flow set. If it is the only one, it can be calculated.
934 # Otherwise, resistor is added back to queue.
936 defined_resistors = {"sink": [], "source": []}
937 undefined_resistors = {"sink": [], "source": []}
939 connected_resistors = [n for n in node.neighbours if isinstance(n, MassTransport)]
940 res: MassTransport
941 for res in connected_resistors:
942 if res.guess_volume_flow is None:
943 if res.sink == node:
944 # resistor is source for node
945 undefined_resistors["source"].append(res)
946 else:
947 # resistor is sink for node
948 undefined_resistors["sink"].append(res)
949 else:
950 if res.sink == node:
951 # resistor is source for node
952 defined_resistors["source"].append(res)
953 else:
954 # resistor is sink for node
955 defined_resistors["sink"].append(res)
957 def calculate_balance(def_resistors: dict):
958 result = 0
959 for def_res in def_resistors["sink"]:
960 result -= def_res.guess_volume_flow
961 for def_res in def_resistors["source"]:
962 result += def_res.guess_volume_flow
963 return result
965 number_undefined_res = len(undefined_resistors["sink"]) + len(undefined_resistors["source"])
966 if number_undefined_res > 1:
967 continue # Volume flow cannot be determined --> check next node or go to 'else'
968 else:
969 if number_undefined_res == 1:
970 # Can only be resistor that isn't defined so its volume flow is set using the balance of
971 # the node.
972 resistor._volume_flow = calculate_balance(defined_resistors)
973 break
974 elif number_undefined_res == 0:
975 break
976 else:
977 # add resistor back to queue (if for loop did break!)
978 if attempts[resistor] >= max_attempts:
979 raise RuntimeError(
980 f"Could not determine attribute for {resistor} after {max_attempts} attempts."
981 )
982 attempts[resistor] += 1
983 queue.append(resistor)
986class ChannelNode(MassFlowNode):
987 @property
988 def flow_length(self) -> float:
989 """
990 The effective length of the channel. Use only if no asking_node is given!
992 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to
993 get the perpendicular direction and define which sides defining the length.
995 Returns
996 -------
997 float :
998 The length, but it can be wrong in few cases.
999 """
1000 _ = super().flow_area
1001 return self._flow_length
1003 @property
1004 def flow_area(self):
1005 self._flow_area = self.diameter**2 / 4 * np.pi
1006 return self._flow_area
1008 @property
1009 def diameter(self) -> float:
1010 """
1011 Returns the diameter by using neighbour nodes that aren't ChannelNodes.
1013 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to
1014 get the perpendicular direction and define which sides defining the diameter.
1016 Returns
1017 -------
1018 float :
1019 The diameter.
1020 """
1021 from pyrc.core.resistors import MassTransport
1023 neighbour: Resistor
1024 for neighbour in self.neighbours:
1025 if not isinstance(neighbour, MassTransport):
1026 asking_node = neighbour.get_connected_node(self)
1027 if not isinstance(asking_node, MassFlowNode):
1028 return self.get_smaller_edge(asking_node)
1030 return np.nan
1032 @property
1033 def volume(self) -> float | int:
1034 """
1035 Returns the volume of the circular air channel.
1037 Returns
1038 -------
1039 float | int | np.number :
1040 The volume of the air channel.
1041 """
1042 return self.flow_area * self.flow_length
1044 @property
1045 def circumference(self):
1046 return self.diameter * np.pi
1048 def get_pipe_area(self, asking_node: Node | BoundaryCondition) -> float | int | np.number:
1049 """
1050 Return the effective area of this ChannelNode.
1052 Parameters
1053 ----------
1054 asking_node : Node | BoundaryCondition
1055 The Node that asks for the area of this Node. It is used to calculate the shift in position and to return
1056 the associated surface area accordingly.
1058 Returns
1059 -------
1060 float | int | np.number :
1061 The area of the surface of the cell that has the normal vector of the line you get connecting both `self`
1062 and `asking_node`.
1063 """
1064 connected_mass_transport_resistors = self.mass_transport_resistors
1066 if isinstance(asking_node, ConnectedFlowObject):
1067 return self.flow_area
1069 if len(connected_mass_transport_resistors) < 3:
1070 return self.circumference * self.flow_length / 2
1071 else:
1072 return self.circumference * self.flow_length / len(connected_mass_transport_resistors)
1074 # def volume_flow(self, *args, **kwargs) -> float | int | np.number:
1075 # """
1076 # Returns the volume flow for the asking Resistor using a diameter that has the same value as the cell
1077 # height/width/depth.
1078 #
1079 # The length of which edge of the cell is used is determined by the asking resistor.
1080 #
1081 # Returns
1082 # -------
1083 # float | int | np.number :
1084 # The volume flow of the `MassFlowNode` in m^3/s.
1085 # """
1086 # return self.flow_area * self.velocity
1089class AirNode(MassFlowNode):
1091 def __init__(self,
1092 temperature: float | int | np.number,
1093 position: np.ndarray,
1094 temperature_derivative: float | int | np.number = 0,
1095 rc_objects: RCObjects = initial_rc_objects,
1096 rc_solution: RCSolution = solution_object,
1097 **kwargs):
1098 super().__init__(material=Air(),
1099 temperature=temperature,
1100 position=position,
1101 temperature_derivative=temperature_derivative,
1102 rc_objects=rc_objects,
1103 rc_solution=rc_solution,
1104 **kwargs)