Coverage for pyrc \ core \ nodes.py: 29%

408 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8from __future__ import annotations 

9 

10import warnings 

11 

12import numpy as np 

13from collections import deque 

14from typing import TYPE_CHECKING 

15from abc import ABC 

16 

17from pyrc.core.components.capacitor import Capacitor 

18from pyrc.core.inputs import BoundaryCondition, FlowBoundaryCondition, InternalHeatSource 

19from pyrc.core.materials import Air 

20from pyrc.core.components.templates import ( 

21 Material, 

22 ObjectWithPorts, 

23 Cell, 

24 Geometric, 

25 ConnectedFlowObject, 

26 calculate_balance_for_resistors, 

27 initial_rc_objects, 

28 RCObjects, 

29 RCSolution, 

30 solution_object, 

31 Fluid, 

32 Solid, 

33) 

34 

35if TYPE_CHECKING: 

36 from pyrc.core.resistors import MassTransport 

37 from pyrc.core.settings import Settings 

38 from pyrc.core.components.resistor import Resistor 

39 

40 

41class ConnectedFlowCapacitorObject(Capacitor, ConnectedFlowObject, ABC): 

42 pass 

43 

44 

45class Node(Capacitor, Cell): 

46 def __init__( 

47 self, 

48 material: Material, 

49 temperature: float | int | np.number, 

50 position: np.ndarray | tuple, 

51 temperature_derivative: float | int | np.number = 0, 

52 internal_heat_source: bool | int | float | np.number = False, 

53 internal_heat_source_type=InternalHeatSource, 

54 delta: np.ndarray | tuple | list = None, 

55 rc_objects=initial_rc_objects, 

56 rc_solution: RCSolution = solution_object, 

57 **kwargs: float | int | np.ndarray | Settings, 

58 ): 

59 """ 

60 

61 Parameters 

62 ---------- 

63 material : Material 

64 Container with all material properties stored in the node is made up from. 

65 temperature : float | int | np.number 

66 The temperature of the node. 

67 It is recommended to use the SI unit Kelvin instead of degrees Celsius. 

68 position : np.ndarray 

69 The position of the node in 2D/3D space. 

70 If 2D, a zero is added for the z coordinate. 

71 temperature_derivative : float | int | np.number, optional 

72 The temperature derivative of the node. 

73 internal_heat_source : bool | int | float | np.number, optional 

74 If True, an internal heat source term is added. 

75 It can be accessed using Node.internal_heat_source 

76 If a number, the number is used to set as volumetric heat source power in W/m**3. 

77 internal_heat_source_type : type, optional 

78 The internal heat source type. 

79 Can be each `InternalHeatSource` type. 

80 delta : np.ndarray | tuple, optional 

81 Delta vector [delta_x, delta_y, delta_z]. 

82 kwargs : dict 

83 Parameters passed to `InternalHeatSource`/its type. 

84 

85 Notes 

86 ----- 

87 Be aware of the fact that the **Node's properties can change during network creation!** 

88 This means, that if you add connections to a Node it's volume/mass can change. So every object, that needs 

89 the Node's volume/mass should be initialized **after** the whole network was build up. This affects, 

90 among other things, `Input` s like `InternalHeatSource` s, especially `Radiation`. 

91 To avoid this, you must pass the whole object (node/BC) to the Input and program it in a way that the values 

92 are created earliest when values are requested (and therefore the network must be set up (hopefully)). 

93 """ 

94 Capacitor.__init__( 

95 self, 

96 capacity=None, 

97 temperature=temperature, 

98 temperature_derivative=temperature_derivative, 

99 rc_objects=rc_objects, 

100 rc_solution=rc_solution, 

101 ) 

102 Cell.__init__(self, position=position, delta=delta) 

103 self.material: Material | Fluid | Solid = material 

104 self._volume = None 

105 self._kwargs = kwargs # saved for __copy__ 

106 

107 # This must be at the end of init. It uses some attributes of node already (like node.id) 

108 if internal_heat_source: 

109 if not isinstance(internal_heat_source, bool): 

110 internal_heat_source = internal_heat_source_type( 

111 node=self, specific_power_in_w_per_cubic_meter=internal_heat_source, **kwargs 

112 ) 

113 elif internal_heat_source_type is not None: 

114 internal_heat_source = internal_heat_source_type(node=self, **kwargs) 

115 else: 

116 internal_heat_source = InternalHeatSource(node=self, **kwargs) 

117 self.add_internal_heat_source(internal_heat_source) 

118 

119 def __copy__(self): 

120 # TODO: This was not checked well. Have a closer look on it. 

121 cls = self.__class__ 

122 internal_heat_source = False if self.internal_heat_source is None else True 

123 internal_heat_source_type = InternalHeatSource 

124 if internal_heat_source: 

125 internal_heat_source_type = type(self.internal_heat_source) 

126 if isinstance(self.internal_heat_source, InternalHeatSource): 

127 Warning( 

128 "A Node with an InternalHeatSource is copied. The attribute\n" 

129 "\tspecific_power_in_w_per_meter_squared\n" 

130 "might be lost." 

131 ) 

132 if hasattr(self.internal_heat_source, "volume_specific_power"): 

133 internal_heat_source = self.internal_heat_source.volume_specific_power 

134 return cls( 

135 material=self.material, 

136 temperature=self.temperature, 

137 position=self.position, 

138 temperature_derivative=self.temperature_derivative, 

139 internal_heat_source=internal_heat_source, 

140 internal_heat_source_type=internal_heat_source_type, 

141 delta=self.delta, 

142 rc_objects=self.rc_objects, 

143 rc_solution=self.solutions, 

144 **self._kwargs, 

145 ) 

146 

147 def __str__(self): 

148 return self.__repr__() 

149 

150 def __repr__(self): 

151 return f"{self.__class__.__name__} {self.id}: {self.material.__class__.__name__}; ϑ={self.temperature}" 

152 

153 @property 

154 def capacity(self) -> float | int: 

155 """ 

156 Returns the capacity (not specific) of the node in J/K. 

157 

158 Returns 

159 ------- 

160 float | int : 

161 The capacity of the node in J/K. 

162 """ 

163 if self._capacity is None: 

164 self.calculate_capacity() 

165 return self._capacity 

166 

167 @property 

168 def volume(self) -> float | int: 

169 if self._volume is None: 

170 self.calculate_volume() 

171 return self._volume 

172 

173 @property 

174 def mass(self) -> float | int | np.number: 

175 """ 

176 Returns the mass flow of the node in kg. 

177 

178 Returns 

179 ------- 

180 float | int : 

181 The mass flow of the node in kg. 

182 If the density isn't defined np.nan is returned. 

183 """ 

184 if self.material.density: 

185 return self.volume * self.material.density 

186 return np.nan 

187 

188 def update_color(self, t_min: float = 263.15, t_max: float = 413.15, colormap="managua") -> None: 

189 """ 

190 Update the color of the vbox for visualization. 

191 

192 Parameters 

193 ---------- 

194 t_min : float | int, optional 

195 The minimal temperature for the color code. 

196 t_max : float | int, optional 

197 The maximal temperature for the color code. 

198 colormap : str, optional 

199 The colormap to use. See pyrc.core.visualization.color.color.py or the txt files in there respectively. 

200 """ 

201 Cell.update_color(self.temperature, t_min=t_min, t_max=t_max, colormap=colormap) 

202 

203 def area(self, normal: np.ndarray) -> float: 

204 """ 

205 Returns the area of the desired plane in m**2. 

206 

207 Parameters 

208 ---------- 

209 normal : np.ndarray 

210 The direction to the wanted plane. 

211 E.g.: if (1,0,0) the y-z-plane is wanted so the area of the yz-plane is returned. 

212 

213 Returns 

214 ------- 

215 float : 

216 The area in m^2. 

217 """ 

218 direction_abs = np.abs(normal) 

219 

220 multiplication_vector = np.array([1, 1, 1]) - direction_abs 

221 area = np.prod(self.delta, where=multiplication_vector.astype(bool)) 

222 return float(area.item()) 

223 

224 def calculate_capacity(self): 

225 self._capacity = self.material.heat_capacity * self.mass # J/K 

226 

227 def calculate_volume(self): 

228 self._volume = self.delta_x * self.delta_y * self.delta_z # m^3 

229 

230 # add volume for each channel node 

231 channel_nodes = self.get_connected_nodes(ChannelNode) 

232 channel_node: ChannelNode 

233 for channel_node in channel_nodes: 

234 raw_volume = channel_node.delta_x * channel_node.delta_y * channel_node.delta_z 

235 solid_volume = raw_volume - channel_node.volume 

236 self._volume += solid_volume / 4 

237 

238 @property 

239 def values(self) -> list: 

240 result = [*super().values_without_capacity, self.capacity] 

241 return result 

242 

243 def get_area(self, asking_node: Node | Cell | BoundaryCondition | Capacitor) -> float | int | np.number: 

244 """ 

245 Returns the area that the asking node sees. Through this area travels the heat between both nodes. 

246 

247 Example: If the asking node has the same position in x and y direction but is shifted in z direction, 

248 the area of the cell face X-Y is returned. 

249 

250 A non-Cell class is only valid if its direction to/from self was set manually in 

251 self.manual_directions[asking_node]. Otherwise, it will throw an error. 

252 

253 Parameters 

254 ---------- 

255 asking_node : Node | Cell | BoundaryCondition | Capacitor 

256 The Node that asks for the area of this Node. It is used to calculate the shift in position and to return 

257 the associated surface area accordingly. 

258 If it doesn't inherit from Cell, it is assumed that the direction is set manually in manual_directions dict 

259 using ``self`` as key. 

260 

261 Returns 

262 ------- 

263 float | int | np.number : 

264 The area of the surface of the cell that has the normal vector of the line you get connecting both `self` 

265 and `asking_node`. 

266 """ 

267 # determine the shift in position between self and asking_node 

268 direction_to_asking_node = self.get_direction(asking_node=asking_node) 

269 

270 return self.area(direction_to_asking_node) 

271 

272 def get_conduction_length(self, asking_node: Node | ConnectedFlowObject | Geometric | Capacitor): 

273 """ 

274 Returns the length to the asking node. It's half the delta value in direction to the asking node. 

275 

276 Parameters 

277 ---------- 

278 asking_node : Geometric 

279 The Node asking for the (half) length between itself and self. 

280 

281 Returns 

282 ------- 

283 float : 

284 The half-length of self. Or: Half the delta value of self pointing towards asking_node. 

285 """ 

286 direction = self.get_direction(asking_node) # returns the direction to (not from) the asking node. 

287 

288 return np.linalg.norm(direction * self.delta) * 0.5 

289 

290 def get_both_perpendicular_lengths(self, asking_node: Geometric | Node | BoundaryCondition) -> tuple[float, float]: 

291 """ 

292 Returns both perpendicular lengths between the self and the asking node. 

293 

294 Parameters 

295 ---------- 

296 asking_node : Geometric | Node | BoundaryCondition 

297 The node asking. 

298 

299 Returns 

300 ------- 

301 tuple[float, float] : 

302 Both length in no particular order. 

303 """ 

304 # get the direction to the asking node 

305 direction = self.get_direction(asking_node) 

306 

307 # Define the mapping of axes 

308 axis_map = { 

309 (1, 0, 0): ["delta_y", "delta_z"], # If direction is X, take Y and Z 

310 (0, 1, 0): ["delta_x", "delta_z"], # If direction is Y, take X and Z 

311 (0, 0, 1): ["delta_x", "delta_y"], # If direction is Z, take X and Y 

312 } 

313 

314 # Convert direction to tuple 

315 direction_tuple = tuple(np.abs(direction).astype(int)) 

316 

317 if direction_tuple not in axis_map: 

318 return np.nan, np.nan # Shouldn't happen if input is correct 

319 

320 # Extract the two perpendicular delta values 

321 delta_1 = getattr(self, axis_map[direction_tuple][0]) 

322 delta_2 = getattr(self, axis_map[direction_tuple][1]) 

323 

324 return delta_1, delta_2 

325 

326 def get_smaller_edge(self, asking_node: Geometric | Node | BoundaryCondition | Capacitor): 

327 """ 

328 Returns the smaller edge of the cell using the asking node to set the direction. 

329 

330 Used to determine the diameter in `MassFlowNode` s. 

331 

332 Parameters 

333 ---------- 

334 asking_node : Geometric | Node | BoundaryCondition 

335 The node that asks for the diameter of this `Node`. It determines the direction to get the right dimension. 

336 

337 Returns 

338 ------- 

339 float | int | np.number : 

340 The diameter (smallest length of the cell side pointing towards `asking_node`). 

341 """ 

342 deltas = self.get_both_perpendicular_lengths(asking_node=asking_node) 

343 

344 # Return the smaller of the two 

345 return min(deltas) 

346 

347 

348class MassFlowNode(Node, ConnectedFlowObject): 

349 def __init__( 

350 self, 

351 material: Material, 

352 temperature, 

353 position, 

354 flow_area: float | int | str = None, 

355 flow_length: float | int | str = None, 

356 temperature_derivative=0, 

357 flow_velocity=None, 

358 mass_flow=None, 

359 volume_flow=None, 

360 rc_objects: RCObjects = initial_rc_objects, 

361 rc_solution: RCSolution = solution_object, 

362 **kwargs, 

363 ): 

364 """ 

365 

366 Parameters 

367 ---------- 

368 temperature : float | int | np.number 

369 The (initial) temperature of the node. 

370 position : np.ndarray 

371 The position of the node in 2D/3D space. 

372 If 2D, a zero is added for the z coordinate. 

373 flow_area : float | int | str 

374 Defining the area that is used for flow calculations (determining the velocity and the Courant number). 

375 Can be a string: 

376 xy: xy-plane, xz: xz-plane, yz: yz-plane 

377 If not given the smallest flow area is used that points to a `ConnectedFlowObject`. 

378 flow_length : float | int | str 

379 Defining the length that the flow takes. 

380 Can be a string: 

381 x: delta_x, y: delta_y, z: delta_z 

382 If not given the matching one for the flow area is used. 

383 temperature_derivative : float | int | np.number 

384 The temperature derivative of the node. 

385 flow_velocity : float | int | np.number 

386 The flow velocity of the node. 

387 It doesn't matter which sign it has. Only the absolute value is used. 

388 """ 

389 ConnectedFlowObject.__init__(self) 

390 Node.__init__( 

391 self, 

392 material=material, 

393 temperature=temperature, 

394 position=position, 

395 temperature_derivative=temperature_derivative, 

396 rc_objects=rc_objects, 

397 rc_solution=rc_solution, 

398 **kwargs, 

399 ) 

400 # Compute area if it's given as a plane 

401 self._flow_length = None 

402 self._flow_area = None 

403 if isinstance(flow_length, str): 

404 self._flow_length = getattr(self, f"delta_{flow_length}") 

405 else: 

406 self._flow_length = flow_length 

407 if isinstance(flow_area, str): 

408 match flow_area.lower(): 

409 case "xy" | "yx": 

410 self._flow_area = self.delta_x * self.delta_y 

411 self._flow_length = self.delta_z 

412 case "xz" | "zx": 

413 self._flow_area = self.delta_x * self.delta_z 

414 self._flow_length = self.delta_y 

415 case "yz" | "zy": 

416 self._flow_area = self.delta_y * self.delta_z 

417 self._flow_length = self.delta_x 

418 case _: 

419 raise ValueError("Invalid plane. Use 'xy', 'xz', or 'yz'.") 

420 else: 

421 self._flow_area = flow_area # Assume it's given as a direct value in m². 

422 # TODO: set flow_area and flow_direction depending on each other. 

423 

424 self._flow_propagation = False 

425 

426 if sum(val is not None for val in [flow_velocity, mass_flow, volume_flow]) != 1: 

427 self._flow_propagation = True 

428 

429 self._passed_flow_velocity = flow_velocity 

430 

431 if volume_flow is not None: 

432 self._volume_flow = volume_flow 

433 elif flow_velocity is not None: 

434 # calculate later because the area isn't known until all connections were made 

435 self._volume_flow = None 

436 elif mass_flow is not None: 

437 self._volume_flow = mass_flow / self.material.density 

438 

439 # Initialize cache 

440 self._velocity = None 

441 self._mass_flow = None 

442 self._flow_direction = None 

443 

444 def courant_number(self, time_step: float): 

445 """ 

446 Calculates the courant number of the node. 

447 

448 Parameters 

449 ---------- 

450 time_step : float | int | np.number 

451 The (maximum) time step used for calculation. 

452 

453 Returns 

454 ------- 

455 float | np.number : 

456 The courant number. 

457 """ 

458 return self.velocity * time_step / self.flow_length 

459 

460 @property 

461 def flow_length(self) -> float: 

462 """ 

463 The effective length of the channel. Use only if no asking_node is given! 

464 

465 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to 

466 get the perpendicular direction and define which sides defining the length. 

467 

468 Returns 

469 ------- 

470 float : 

471 The length, but it can be wrong in few cases. 

472 """ 

473 if self._flow_length is None: 

474 # request flow_area and calculate the value in it 

475 _ = self.flow_area 

476 return self._flow_length 

477 

478 @property 

479 def flow_area(self): 

480 if self._flow_area is None or self._flow_length is None: 

481 # get the areas between self and ConnectedFlowObjects and take the smallest one. 

482 result = np.inf 

483 flow_length = 0 

484 flow_obj: MassFlowNode | BoundaryCondition 

485 for flow_obj in self.mass_flow_neighbours: 

486 area = self.get_area(flow_obj) 

487 if result > area: 

488 result = area 

489 flow_length = self.get_direction(flow_obj) * self.delta 

490 self._flow_area = result 

491 self._flow_length = np.linalg.norm(flow_length) 

492 return self._flow_area 

493 

494 @property 

495 def volume_flow(self): 

496 """Returns the cached volume flow rate (m³/s).""" 

497 if self._flow_propagation and self._volume_flow is None: 

498 self.propagate_flow() 

499 # volume flow should be set now 

500 self._flow_propagation = False 

501 if self._volume_flow is None: 

502 if self._passed_flow_velocity: 

503 self._volume_flow = self._passed_flow_velocity * self.flow_area 

504 else: 

505 raise ValueError("This shouldn't have happened!") 

506 return self._volume_flow 

507 

508 @property 

509 def velocity(self): 

510 """Returns the cached flow velocity (m/s) or calculates it if needed.""" 

511 if self._velocity is None: 

512 self._velocity = self.volume_flow / self.flow_area 

513 return self._velocity 

514 

515 @property 

516 def mass_flow(self): 

517 """Returns the cached mass flow rate (kg/s) or calculates it if needed.""" 

518 if self._mass_flow is None: 

519 self._mass_flow = self.volume_flow * self.material.density 

520 return self._mass_flow 

521 

522 def reset_properties(self): 

523 """Clears cached flow velocity and mass flow to force recalculation.""" 

524 super().reset_properties() 

525 self._velocity = None 

526 self._mass_flow = None 

527 

528 @property 

529 def sources(self) -> list[Capacitor | Node]: 

530 """ 

531 Returns a list with all nodes where volume flow is flowing into the node. 

532 

533 No check is performed which node type is put into the list, but it should only be `MassFlowNode` s and its 

534 descendants or `FlowBoundaryCondition` s. 

535 

536 Returns 

537 ------- 

538 list : 

539 All nodes serving as volume flow source. 

540 """ 

541 result = [] 

542 seen = set() 

543 resistor: MassTransport 

544 for resistor in self.connected_mass_transport_resistors(): 

545 if resistor.sink == self: 

546 connected = resistor.get_connected_node(self) 

547 if connected not in seen: 

548 seen.add(connected) 

549 result.append(connected) 

550 return result 

551 

552 @property 

553 def sinks(self) -> list[MassFlowNode | FlowBoundaryCondition]: 

554 """ 

555 Returns a list of all `MassFlowNode` s into which the flow is flowing. 

556 

557 No check is performed which node type is put into the list, but it should only be `MassFlowNode` s and its 

558 descendants or `FlowBoundaryCondition` s. 

559 

560 Returns 

561 ------- 

562 list : 

563 All nodes serving as volume flow sink. 

564 """ 

565 result = [] 

566 seen = set() 

567 resistor: MassTransport 

568 for resistor in self.connected_mass_transport_resistors(): 

569 if resistor.source == self: 

570 connected = resistor.get_connected_node(self) 

571 if connected not in seen: 

572 seen.add(connected) 

573 result.append(connected) 

574 return result 

575 

576 @property 

577 def diameter(self) -> float: 

578 """ 

579 Returns the hydraulic diameter. 

580 

581 Returns 

582 ------- 

583 float : 

584 The hydraulic diameter of the node. 

585 """ 

586 len1, len2 = self.get_minimal_perpendicular_lengths_to_flow() 

587 len1 = float(len1) 

588 len2 = float(len2) 

589 # return hydraulic diameter 

590 return 2 * len1 * len2 / (len1 + len2) 

591 

592 def get_resistor_volume_flow(self, resistor: MassTransport): 

593 """ 

594 Returns the volume flow for the given resistor using mass balance. 

595 

596 Parameters 

597 ---------- 

598 resistor : MassTransport 

599 The `Resistor` to determine the volume flow for. 

600 

601 Returns 

602 ------- 

603 float : 

604 The volume flow of the given `Resistor`. 

605 """ 

606 result = 0 

607 other_node_on_resistor = resistor.get_connected_node(self) 

608 for sink in self.sinks: 

609 if sink != other_node_on_resistor: 

610 result -= sink.volume_flow 

611 source: ConnectedFlowObject 

612 for source in self.sources: 

613 if source != other_node_on_resistor: 

614 result += source.volume_flow 

615 

616 # if other_node_on_resistor is in sinks, the volume flow is positive (if it flows from source to sink) and 

617 # vice versa: if in sources, the volume flow is negative 

618 if other_node_on_resistor in self.sources: 

619 result *= -1 

620 return result 

621 

622 @property 

623 def start_nodes(self) -> list: 

624 """ 

625 Returns all nodes that are defined as a start of the mass flow. 

626 

627 Used for the algorithm to set the direction of the mass flow (make_velocity_direction). 

628 

629 Returns 

630 ------- 

631 list : 

632 A list with all mass flow start BoundaryConditions. 

633 """ 

634 result = [] 

635 element: ObjectWithPorts 

636 for element in self.rc_objects.all: 

637 if isinstance(element, BoundaryCondition): 

638 if element.is_mass_flow_start: 

639 result.append(element) 

640 return result 

641 

642 @property 

643 def mass_flow_neighbours(self): 

644 """ 

645 Returns all neighbour `MassFlowNode` s. 

646 

647 Returns 

648 ------- 

649 list : 

650 The list with all neighbour `MassFlowNode` s 

651 """ 

652 return [*self.sinks, *self.sources] 

653 

654 @property 

655 def balance(self): 

656 return calculate_balance_for_resistors(self, self.connected_mass_transport_resistors()) 

657 

658 def check_balance(self) -> bool: 

659 """ 

660 Returns True if every parent `ConnectedFlowObject` has a balanced volume flow. 

661 

662 This implies that the algorithm to set the volume flow always walks from the flow start to its end and not 

663 the other way around. 

664 

665 Returns 

666 ------- 

667 bool : 

668 True if every parent `ConnectedFlowObject` has a balanced volume flow. False otherwise. 

669 """ 

670 if self.volume_flow_is_balanced: 

671 return True 

672 # walk the flow upwards and check every source if they are balanced. 

673 source: ConnectedFlowObject 

674 for source in self.sources: 

675 if source.check_balance(): 

676 continue 

677 else: 

678 return False 

679 # check if self is balanced 

680 balance = self.balance 

681 

682 if np.isclose(balance, 0): 

683 self.volume_flow_is_balanced = True 

684 return True 

685 return False 

686 

687 @property 

688 def flow_direction(self) -> list[np.ndarray]: 

689 result = [] 

690 if self._flow_direction is None: 

691 flow_direction = np.array((0, 0, 0)) 

692 for neighbour in self.mass_flow_neighbours: 

693 flow_direction += np.abs(neighbour.get_direction(self)) 

694 for i, value in enumerate(flow_direction): 

695 if value > 0: 

696 result.append(np.zeros(3)) 

697 result[-1][i] = 1 

698 self._flow_direction = flow_direction 

699 return self._flow_direction 

700 

701 def get_minimal_perpendicular_lengths_to_flow(self) -> tuple: 

702 flow_direction: list = self.flow_direction 

703 

704 if len(flow_direction) > 1: 

705 minima = np.partition(self.delta, 1)[0:2] 

706 return minima[0], minima[1] 

707 

708 mask_vector = np.array([1, 1, 1]) - np.abs(flow_direction[0]) 

709 

710 masked_array = np.ma.array(self.delta, mask=mask_vector) 

711 # invert mask of masked array 

712 masked_array = np.ma.array(masked_array.data, mask=~masked_array.mask) 

713 return tuple(masked_array.compressed()) 

714 

715 def get_perpendicular_length_parallel_to_flow(self, asking_node: Node): 

716 """ 

717 Returns the length that is perpendicular to the connection asking_node - self and is parallel to the flow. 

718 

719 Used for the effective area between a Solid and a MassFlowNode as well as to get the length of ChannelNodes. 

720 

721 Parameters 

722 ---------- 

723 asking_node : Node 

724 

725 Returns 

726 ------- 

727 float : 

728 The length. 

729 """ 

730 first_vector = self.mass_flow_neighbours[0].get_direction(self) 

731 

732 second_vector = self.get_direction(asking_node) 

733 

734 # make cross product to get the third vector 

735 third_vector = np.abs(np.cross(first_vector, second_vector)) 

736 

737 return float((third_vector.reshape(1, 3) @ self.delta.reshape(3, 1)).item()) 

738 

739 def connected_mass_transport_resistors(self) -> list[MassTransport]: 

740 """ 

741 Returns all connected mass transport resistors. 

742 

743 Returns 

744 ------- 

745 list[MassTransport] : 

746 The list. 

747 """ 

748 from pyrc.core.resistors import MassTransport 

749 

750 result = [] 

751 for neighbour in self.neighbours: 

752 if isinstance(neighbour, MassTransport): 

753 result.append(neighbour) 

754 return result 

755 

756 @property 

757 def mass_transport_resistors(self) -> list: 

758 """ 

759 Returns a list with all MassTransport resistors in the network. 

760 

761 Returns 

762 ------- 

763 list : 

764 The list with all MassTransport resistors in the network. 

765 """ 

766 from pyrc.core.resistors import MassTransport 

767 

768 return self.rc_objects.get_all_objects(MassTransport) 

769 

770 @property 

771 def mass_flow_nodes(self) -> list: 

772 """ 

773 Returns a list with all MassTransport resistors in the network. 

774 

775 Returns 

776 ------- 

777 list : 

778 The list with all MassTransport resistors in the network. 

779 """ 

780 from pyrc.core.resistors import MassFlowNode 

781 

782 return self.rc_objects.get_all_objects(MassFlowNode) 

783 

784 # def make_velocity_direction(self): 

785 # """ 

786 # Determines and sets the source and sink for each resistor in a network using iterative DFS. 

787 # """ 

788 # from pyrc.core.resistors import MassTransport 

789 # resistors: list[MassTransport] = self.mass_transport_resistors 

790 # start_nodes = self.start_nodes 

791 # 

792 # graph = {} 

793 # resistor_map = {} # Maps node pairs (unordered) to resistors 

794 # 

795 # # Step 1: Build adjacency list and resistor mapping 

796 # for resistor in resistors: 

797 # u, v = resistor.neighbours # Each resistor connects exactly two nodes 

798 # key = frozenset((u, v)) # Store as an unordered key 

799 # 

800 # graph[u].append(v) 

801 # graph[v].append(u) 

802 # resistor_map[key] = resistor # Single entry instead of two 

803 # 

804 # # Step 2: Start iterative DFS from each start node 

805 # for start_node in start_nodes: 

806 # stack = [(start_node, None)] 

807 # visited = set() 

808 # 

809 # while stack: 

810 # node, parent = stack.pop() 

811 # if node in visited: 

812 # continue 

813 # visited.add(node) 

814 # 

815 # for neighbor in graph[node]: 

816 # if neighbor == parent: 

817 # continue 

818 # key = frozenset((node, neighbor)) 

819 # resistor = resistor_map[key] 

820 # resistor.source = node 

821 # resistor.sink = neighbor 

822 # stack.append((neighbor, node)) 

823 # 

824 # # Ensure all resistors have source and sink set 

825 # for resistor in resistors: 

826 # if resistor.source is None or resistor.sink is None: 

827 # raise ValueError( 

828 # f"Resistor between {resistor.neighbours[0]} and {resistor.neighbours[1]} is not properly set.") 

829 

830 def propagate_flow(self): 

831 """ 

832 Propagates the volume flow through all connected `ConnectedFlowObject` s from start to end. 

833 

834 It considers already set volume flows on its way to the end, but until now there is now backflow allowed. So 

835 if the set volume flow of an sink exceeds the balance it will be overwritten. 

836 

837 Only works if the volume flow is set at the start `FlowBoundaryCondition`! It walks from the start to the end 

838 and not the other way round. 

839 """ 

840 start_nodes = self.start_nodes 

841 queue = deque(start_nodes) 

842 considered = set(start_nodes) 

843 

844 while queue: 

845 node: ConnectedFlowCapacitorObject = queue.popleft() 

846 

847 if node.guess_volume_flow is None: 

848 warnings.warn("That shouldn't have happened.") 

849 continue # Skip nodes without a defined volume flow 

850 node.volume_flow_is_balanced = True 

851 

852 sink_nodes: list[ConnectedFlowObject] = [n for n in node.sinks if n not in considered] 

853 if not sink_nodes: 

854 # dead end. 

855 continue 

856 

857 defined_sinks = [n for n in sink_nodes if n.volume_flow_is_balanced] 

858 undefined_sinks = [n for n in sink_nodes if not n.volume_flow_is_balanced] 

859 

860 if len(defined_sinks) > 0: 

861 total_defined_flow = sum(n.guess_volume_flow for n in defined_sinks) 

862 remaining_flow = node.guess_volume_flow - total_defined_flow 

863 if np.isclose(remaining_flow, 0): 

864 remaining_flow = 0 

865 if remaining_flow < 0: 

866 # overwrite all incorrect sink volume flows with equal distribution 

867 # TODO: Maybe allow negative volume flows (back-flows)? 

868 print("Warning: Set volume flow is overwritten to create no backflow.") 

869 undefined_sinks.extend(defined_sinks) 

870 defined_sinks = [] 

871 for sink in defined_sinks: 

872 if sink not in considered: 

873 # TODO: check if sources are all having a defined volume flow (this would save circle circuits) 

874 add_sink_to_queue = True 

875 for source in sink.sources: 

876 if source.volume_flow_is_balanced: 

877 continue 

878 else: 

879 add_sink_to_queue = False 

880 break 

881 if add_sink_to_queue: 

882 queue.append(sink) 

883 considered.add(sink) 

884 else: 

885 remaining_flow = node.guess_volume_flow 

886 

887 if len(undefined_sinks) > 0: 

888 equal_flow = remaining_flow / len(undefined_sinks) 

889 for sink in undefined_sinks: 

890 if sink not in considered: 

891 if sink.guess_volume_flow is None: 

892 sink._volume_flow = 0 # Initialize if undefined 

893 

894 sink._volume_flow += equal_flow 

895 # set volume flow in resistor in between 

896 resistor = node.get_mass_transport_to_node(sink) 

897 resistor._volume_flow = equal_flow 

898 

899 if all(s.volume_flow_is_balanced for s in sink.sources): 

900 queue.append(sink) 

901 considered.add(sink) 

902 

903 self.set_remaining_resistor_volume_flow() 

904 

905 def set_remaining_resistor_volume_flow(self): 

906 """ 

907 Sets all unset volume flows of all `MassTransport` `Resistors`. 

908 

909 The most of them should have been set already in `self.propagate_flow()`. 

910 

911 """ 

912 resistors = [r for r in self.mass_transport_resistors if r.guess_volume_flow is None] 

913 max_attempts = len(resistors) 

914 queue = deque(resistors) 

915 attempts = {resistor: 0 for resistor in resistors} # keep track how often the resistor is re-added to the queue 

916 

917 resistor: MassTransport 

918 while queue: 

919 resistor = queue.popleft() 

920 

921 node: ConnectedFlowCapacitorObject 

922 for node in resistor.neighbours: 

923 # First: check if node only has two resistors connected. If so, set volume_flow of node as own one. 

924 if len(node.sinks) == 1 and len(node.sources) == 1: 

925 resistor._volume_flow = node.volume_flow 

926 break # go to next resistor 

927 else: # Only executed if the first loop did not break 

928 node: ConnectedFlowCapacitorObject 

929 from pyrc.core.resistors import MassTransport 

930 

931 for node in resistor.neighbours: 

932 # Determine the volume_flow balance of the node using the resistor.guess_volume_flow 

933 # At least 'resistor' has no volume_flow set. If it is the only one, it can be calculated. 

934 # Otherwise, resistor is added back to queue. 

935 

936 defined_resistors = {"sink": [], "source": []} 

937 undefined_resistors = {"sink": [], "source": []} 

938 

939 connected_resistors = [n for n in node.neighbours if isinstance(n, MassTransport)] 

940 res: MassTransport 

941 for res in connected_resistors: 

942 if res.guess_volume_flow is None: 

943 if res.sink == node: 

944 # resistor is source for node 

945 undefined_resistors["source"].append(res) 

946 else: 

947 # resistor is sink for node 

948 undefined_resistors["sink"].append(res) 

949 else: 

950 if res.sink == node: 

951 # resistor is source for node 

952 defined_resistors["source"].append(res) 

953 else: 

954 # resistor is sink for node 

955 defined_resistors["sink"].append(res) 

956 

957 def calculate_balance(def_resistors: dict): 

958 result = 0 

959 for def_res in def_resistors["sink"]: 

960 result -= def_res.guess_volume_flow 

961 for def_res in def_resistors["source"]: 

962 result += def_res.guess_volume_flow 

963 return result 

964 

965 number_undefined_res = len(undefined_resistors["sink"]) + len(undefined_resistors["source"]) 

966 if number_undefined_res > 1: 

967 continue # Volume flow cannot be determined --> check next node or go to 'else' 

968 else: 

969 if number_undefined_res == 1: 

970 # Can only be resistor that isn't defined so its volume flow is set using the balance of 

971 # the node. 

972 resistor._volume_flow = calculate_balance(defined_resistors) 

973 break 

974 elif number_undefined_res == 0: 

975 break 

976 else: 

977 # add resistor back to queue (if for loop did break!) 

978 if attempts[resistor] >= max_attempts: 

979 raise RuntimeError( 

980 f"Could not determine attribute for {resistor} after {max_attempts} attempts." 

981 ) 

982 attempts[resistor] += 1 

983 queue.append(resistor) 

984 

985 

986class ChannelNode(MassFlowNode): 

987 @property 

988 def flow_length(self) -> float: 

989 """ 

990 The effective length of the channel. Use only if no asking_node is given! 

991 

992 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to 

993 get the perpendicular direction and define which sides defining the length. 

994 

995 Returns 

996 ------- 

997 float : 

998 The length, but it can be wrong in few cases. 

999 """ 

1000 _ = super().flow_area 

1001 return self._flow_length 

1002 

1003 @property 

1004 def flow_area(self): 

1005 self._flow_area = self.diameter**2 / 4 * np.pi 

1006 return self._flow_area 

1007 

1008 @property 

1009 def diameter(self) -> float: 

1010 """ 

1011 Returns the diameter by using neighbour nodes that aren't ChannelNodes. 

1012 

1013 This assumes that each ChannelNode is connected to at least one solid node at its sides. This node is used to 

1014 get the perpendicular direction and define which sides defining the diameter. 

1015 

1016 Returns 

1017 ------- 

1018 float : 

1019 The diameter. 

1020 """ 

1021 from pyrc.core.resistors import MassTransport 

1022 

1023 neighbour: Resistor 

1024 for neighbour in self.neighbours: 

1025 if not isinstance(neighbour, MassTransport): 

1026 asking_node = neighbour.get_connected_node(self) 

1027 if not isinstance(asking_node, MassFlowNode): 

1028 return self.get_smaller_edge(asking_node) 

1029 

1030 return np.nan 

1031 

1032 @property 

1033 def volume(self) -> float | int: 

1034 """ 

1035 Returns the volume of the circular air channel. 

1036 

1037 Returns 

1038 ------- 

1039 float | int | np.number : 

1040 The volume of the air channel. 

1041 """ 

1042 return self.flow_area * self.flow_length 

1043 

1044 @property 

1045 def circumference(self): 

1046 return self.diameter * np.pi 

1047 

1048 def get_pipe_area(self, asking_node: Node | BoundaryCondition) -> float | int | np.number: 

1049 """ 

1050 Return the effective area of this ChannelNode. 

1051 

1052 Parameters 

1053 ---------- 

1054 asking_node : Node | BoundaryCondition 

1055 The Node that asks for the area of this Node. It is used to calculate the shift in position and to return 

1056 the associated surface area accordingly. 

1057 

1058 Returns 

1059 ------- 

1060 float | int | np.number : 

1061 The area of the surface of the cell that has the normal vector of the line you get connecting both `self` 

1062 and `asking_node`. 

1063 """ 

1064 connected_mass_transport_resistors = self.mass_transport_resistors 

1065 

1066 if isinstance(asking_node, ConnectedFlowObject): 

1067 return self.flow_area 

1068 

1069 if len(connected_mass_transport_resistors) < 3: 

1070 return self.circumference * self.flow_length / 2 

1071 else: 

1072 return self.circumference * self.flow_length / len(connected_mass_transport_resistors) 

1073 

1074 # def volume_flow(self, *args, **kwargs) -> float | int | np.number: 

1075 # """ 

1076 # Returns the volume flow for the asking Resistor using a diameter that has the same value as the cell 

1077 # height/width/depth. 

1078 # 

1079 # The length of which edge of the cell is used is determined by the asking resistor. 

1080 # 

1081 # Returns 

1082 # ------- 

1083 # float | int | np.number : 

1084 # The volume flow of the `MassFlowNode` in m^3/s. 

1085 # """ 

1086 # return self.flow_area * self.velocity 

1087 

1088 

1089class AirNode(MassFlowNode): 

1090 

1091 def __init__(self, 

1092 temperature: float | int | np.number, 

1093 position: np.ndarray, 

1094 temperature_derivative: float | int | np.number = 0, 

1095 rc_objects: RCObjects = initial_rc_objects, 

1096 rc_solution: RCSolution = solution_object, 

1097 **kwargs): 

1098 super().__init__(material=Air(), 

1099 temperature=temperature, 

1100 position=position, 

1101 temperature_derivative=temperature_derivative, 

1102 rc_objects=rc_objects, 

1103 rc_solution=rc_solution, 

1104 **kwargs)