Coverage for pyrc \ postprocessing \ heat.py: 13%
149 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
7from __future__ import annotations
9from datetime import datetime
10from typing import TYPE_CHECKING
12import numpy as np
13from matplotlib import pyplot as plt
15from pyrc.core.components.templates import solution_object
16from pyrc.visualization import seconds_to_dates, TimePlot
18if TYPE_CHECKING:
19 from pyrc.core.nodes import Node, MassFlowNode, ChannelNode
20 from pyrc.core.components.capacitor import Capacitor
21 from pyrc.core.components.resistor import Resistor
22 from pyrc.core.inputs import BoundaryCondition, InternalHeatSource
23 from pyrc.core.components.templates import RCSolution
26def get_accumulated_heat_flux(node: Capacitor):
27 """
28 Sums up the heat flux of all connected resistors.
30 Used to get the heat flux that is brought in through `BoundaryCondition` s.
32 Parameters
33 ----------
34 node : NoteTemplate
35 The Node of which the heat flux is being calculated.
36 For this, all connected resistors are summed up.
38 Returns
39 -------
40 float :
41 The heat flux of all connected resistors.
42 """
43 resistors: Resistor = node.neighbours
44 seen_nodes = set()
45 resistors_without_parallel = []
46 for resistor in resistors:
47 connected_node = resistor.get_connected_node(node)
48 if connected_node not in seen_nodes:
49 seen_nodes.add(connected_node)
50 resistors_without_parallel.append(resistor)
52 heat_flux = [r.heat_flux(node) for r in resistors_without_parallel]
54 return sum(heat_flux)
57class HeatFlux:
58 def __init__(self, rc_solution: RCSolution = solution_object):
59 self.rc_solution = rc_solution
61 def parse_time_step_index(self, time_step_index):
62 if time_step_index is None:
63 time_step_index = list(range(len(self.rc_solution.time_steps)))
64 elif (
65 not isinstance(time_step_index, list)
66 and not isinstance(time_step_index, slice)
67 and not isinstance(time_step_index, np.ndarray)
68 ):
69 time_step_index = [time_step_index]
70 return time_step_index
72 def boundary(self, boundary: BoundaryCondition, time_step_index=None):
73 resistors: Resistor = boundary.neighbours
75 time_step_index = self.parse_time_step_index(time_step_index)
77 bc_temp = self.rc_solution.input_vectors[time_step_index, boundary.index]
78 heat_flux_sum = np.zeros_like(bc_temp)
80 for resistor in resistors:
81 other_node_index = resistor.get_connected_node(boundary).index
82 node_temp = self.rc_solution.result_vectors[time_step_index, other_node_index]
83 heat_flux_sum += 1 / resistor.resistance * (bc_temp - node_temp)
84 return self.rc_solution.time_steps[time_step_index], heat_flux_sum
86 def preheat(self, distributor: MassFlowNode, collector: MassFlowNode):
87 """
88 Returns the preheat in Kelvin.
90 Returns
91 -------
92 np.ndarray
93 """
94 time_step_index = self.parse_time_step_index(None)
95 temp_dist: np.ndarray = self.rc_solution.result_vectors[time_step_index, distributor.index]
96 temp_col: np.ndarray = self.rc_solution.result_vectors[time_step_index, collector.index]
97 return temp_col - temp_dist
99 def calculate_heat_flux(self, node, resistors: list | Resistor, time_step_index=None) -> np.ndarray | float:
100 """
101 Returns the heat flux going into the node by the passed resistors.
103 Parameters
104 ----------
105 node : NoteTemplate
106 The Node of which the heat flux is being calculated.
107 resistors : list | Resistor
108 The resistors used to calculate the heat flow.
109 time_step_index : int | slice | list, optional
110 The time steps that should be calculated.
111 If None, all are calculated.
113 Returns
114 -------
115 float | np.ndarray :
116 The heat fluxes for every time step.
117 """
118 from pyrc.core.components.resistor import Resistor
120 if isinstance(resistors, Resistor):
121 resistors = [resistors]
122 time_step_index = self.parse_time_step_index(time_step_index)
124 result = 0
125 node_temp = self.rc_solution.result_vectors[time_step_index, node.index]
127 from pyrc.core.components.input import Input
129 for resistor in resistors:
130 other_node = resistor.get_connected_node(node)
131 if isinstance(other_node, Input):
132 result += (
133 1
134 / resistor.equivalent_resistance
135 * (self.rc_solution.input_vectors[time_step_index, other_node.index] - node_temp)
136 )
137 else:
138 result += (
139 1
140 / resistor.equivalent_resistance
141 * (self.rc_solution.result_vectors[time_step_index, other_node.index] - node_temp)
142 )
143 return result
145 def internal_heat_source(self, ihc: InternalHeatSource | list, time_step_index=None):
146 if not isinstance(ihc, list):
147 ihc = [ihc]
148 ihc_index = [i.index for i in ihc]
149 time_step_index = self.parse_time_step_index(time_step_index)
150 ihc_values = self.rc_solution.input_vectors[time_step_index, :][:, ihc_index]
151 return self.rc_solution.time_steps[time_step_index], ihc_values
153 def heat_flux_directions(
154 self,
155 nodes: list[Node],
156 directions: list | np.ndarray = np.array([0, 0, -1]),
157 except_resistor_types=None,
158 time_step_index=None,
159 ) -> tuple | np.ndarray:
160 """
161 Returns the heat flux through the layer in the desired direction of all nodes in the given list.
163 E.g. if the direction is (0,0,1) the heat flux in positive z direction is calculated. The heat flux is
164 positive if going in the same direction as the desired one.
166 Parameters
167 ----------
168 nodes : list[Node]
169 The Nodes of which the heat flux is being calculated.
170 directions : list | np.ndarray, optional
171 The direction(s) in which the heat flux is calculated.
172 Can be a list then each direction is calculated and returned.
173 except_resistor_types
174 time_step_index
176 Returns
177 -------
178 np.ndarray | tuple :
179 The result of one direction as np.ndarray or the result of all directions as tuple.
180 """
181 if except_resistor_types is None:
182 except_resistor_types = []
183 if isinstance(directions, np.ndarray):
184 directions = [directions]
185 results = []
186 for direction in directions:
187 nodes_and_resistors = [
188 (node, node.resistors_in_direction_filtered(direction, except_resistor_types=except_resistor_types))
189 for node in nodes
190 if node is not None
191 ]
193 result = 0
194 for node, resistors in nodes_and_resistors:
195 result += self.calculate_heat_flux(node, resistors, time_step_index)
196 results.append(result)
198 if len(results) > 1:
199 return tuple(results)
200 return results[0]
203def fluxes_data(
204 boundaries: list = None,
205 boundaries_sum: list = None,
206 time_step_index=None,
207 balance: bool = False,
208 rc_solution: RCSolution = None,
209):
210 if rc_solution is not None:
211 heat_flux = HeatFlux(rc_solution=rc_solution)
212 else:
213 heat_flux = HeatFlux()
215 results = []
216 labels = []
218 sum_vector = None
219 time_steps = None
221 def add_to_sum(result: np.ndarray, _sum_vector):
222 if _sum_vector is None:
223 _sum_vector = np.zeros_like(result)
224 return _sum_vector + result
226 if boundaries is not None:
227 for bc in boundaries:
228 time_steps, y = heat_flux.boundary(bc, time_step_index)
229 results.append(y)
230 labels.append(bc.__class__.__name__)
231 sum_vector = add_to_sum(y, sum_vector)
232 if boundaries_sum is not None:
233 if not isinstance(boundaries_sum[0], list):
234 boundaries_sum = [boundaries_sum]
235 for i, bc_groups in enumerate(boundaries_sum):
236 time_steps, bc_sum = heat_flux.internal_heat_source(bc_groups[0], time_step_index)
237 for bc in bc_groups[1:]:
238 _, bc_add_on = heat_flux.internal_heat_source(bc, time_step_index)
239 bc_sum += bc_add_on
240 results.append(bc_sum)
241 labels.append(f"{bc_groups[0].__class__.__name__}s: {i}")
242 sum_vector = add_to_sum(bc_sum.reshape(-1, 1), sum_vector.reshape(-1, 1))
243 if balance and time_steps is not None and sum_vector is not None:
244 results.append(sum_vector)
245 labels.append("Balance")
246 return time_steps, results, labels
249def plot_channel_balance(
250 channel_nodes: list[Node] | ChannelNode,
251 distributor: MassFlowNode,
252 collector: MassFlowNode,
253 time_step_index=None,
254 start_date=datetime(2023, 1, 1),
255 y_scale=1,
256):
257 """
258 Plots the balance of the given channel nodes.
260 Parameters
261 ----------
262 channel_nodes : list[ChannelNode] | ChannelNode
263 The channel nodes to plot the sum of.
264 distributor : MassFlowNode
265 The distributor before the channel nodes.
266 Used to calculate the heat flux that goes into the mass flow within the channel nodes.
267 collector : MassFlowNode
268 The collector after the channel nodes.
269 Used to calculate the heat flux that goes into the mass flow within the channel nodes.
270 time_step_index : int | slice | list, optional
271 The time steps that should be calculated.
273 """
274 from pyrc.core.nodes import ChannelNode
276 if isinstance(channel_nodes, ChannelNode):
277 channel_nodes = [channel_nodes]
278 heat_flux = HeatFlux()
279 time_step_index = heat_flux.parse_time_step_index(time_step_index)
280 exterior_direction = np.array([0, 0, -1])
281 interior_direction = np.array([0, 0, 1])
282 upper_direction = np.array([0, -1, 0])
283 lower_direction = np.array([0, 1, 0])
284 ex, interior, up, lo = heat_flux.heat_flux_directions(
285 channel_nodes,
286 directions=[exterior_direction, interior_direction, upper_direction, lower_direction],
287 time_step_index=time_step_index,
288 )
289 # side_heat_flux_dist = heat_flux.heat_flux_directions([distributor],
290 # directions=[np.array([1, 0, 0])],
291 # time_step_index=time_step_index,
292 # except_resistor_types=[MassTransport]
293 # )
294 # side_heat_flux_col = heat_flux.heat_flux_directions([collector],
295 # directions=[np.array([-1, 0, 0])],
296 # time_step_index=time_step_index,
297 # except_resistor_types=[MassTransport]
298 # )
299 temp_dist = heat_flux.rc_solution.result_vectors[time_step_index, distributor.index]
300 temp_col = heat_flux.rc_solution.result_vectors[time_step_index, collector.index]
301 mass_flow = distributor.mass_flow
302 spec_capacity = distributor.material.heat_capacity
303 channel_heat_flux = mass_flow * spec_capacity * (temp_col - temp_dist)
305 x = heat_flux.rc_solution.time_steps[time_step_index]
306 x = seconds_to_dates(x, start_date)
308 time_plot = TimePlot(
309 x=x,
310 ys=[ex, interior, (up + lo)],
311 labels=["exterior", "interior", "inbetween"],
312 y_scale=y_scale,
313 y_title="Heat Flux / W",
314 )
315 time_plot.plot_stack()
316 time_plot.ax.plot(
317 time_plot.x,
318 channel_heat_flux * time_plot.y_scale,
319 label=["channel"],
320 color="black",
321 linewidth=time_plot.line_width,
322 )
323 time_plot.format()
324 time_plot.show()
327def plot_fluxes_accumulated(
328 boundaries: list = None, boundaries_sum: list = None, time_step_index=None, plot_balance: bool = False
329):
330 x, ys, labels = fluxes_data(boundaries, boundaries_sum, time_step_index, plot_balance)
332 fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True)
334 for i, y in enumerate(ys):
335 if labels[i] == "Balance":
336 pass
337 ax1.plot(x, y, labels[i])
339 fig.tight_layout()
340 ax1.set_xlabel("Time")
341 ax1.set_ylabel("Positive Heat flux / W")
342 ax2.set_ylabel("Negative Heat flux / W")
343 for ax in (ax1, ax2):
344 ax.grid(True)
345 ax.legend()
347 plt.show()