Coverage for pyrc \ postprocessing \ heat.py: 13%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7from __future__ import annotations 

8 

9from datetime import datetime 

10from typing import TYPE_CHECKING 

11 

12import numpy as np 

13from matplotlib import pyplot as plt 

14 

15from pyrc.core.components.templates import solution_object 

16from pyrc.visualization import seconds_to_dates, TimePlot 

17 

18if TYPE_CHECKING: 

19 from pyrc.core.nodes import Node, MassFlowNode, ChannelNode 

20 from pyrc.core.components.capacitor import Capacitor 

21 from pyrc.core.components.resistor import Resistor 

22 from pyrc.core.inputs import BoundaryCondition, InternalHeatSource 

23 from pyrc.core.components.templates import RCSolution 

24 

25 

26def get_accumulated_heat_flux(node: Capacitor): 

27 """ 

28 Sums up the heat flux of all connected resistors. 

29 

30 Used to get the heat flux that is brought in through `BoundaryCondition` s. 

31 

32 Parameters 

33 ---------- 

34 node : NoteTemplate 

35 The Node of which the heat flux is being calculated. 

36 For this, all connected resistors are summed up. 

37 

38 Returns 

39 ------- 

40 float : 

41 The heat flux of all connected resistors. 

42 """ 

43 resistors: Resistor = node.neighbours 

44 seen_nodes = set() 

45 resistors_without_parallel = [] 

46 for resistor in resistors: 

47 connected_node = resistor.get_connected_node(node) 

48 if connected_node not in seen_nodes: 

49 seen_nodes.add(connected_node) 

50 resistors_without_parallel.append(resistor) 

51 

52 heat_flux = [r.heat_flux(node) for r in resistors_without_parallel] 

53 

54 return sum(heat_flux) 

55 

56 

57class HeatFlux: 

58 def __init__(self, rc_solution: RCSolution = solution_object): 

59 self.rc_solution = rc_solution 

60 

61 def parse_time_step_index(self, time_step_index): 

62 if time_step_index is None: 

63 time_step_index = list(range(len(self.rc_solution.time_steps))) 

64 elif ( 

65 not isinstance(time_step_index, list) 

66 and not isinstance(time_step_index, slice) 

67 and not isinstance(time_step_index, np.ndarray) 

68 ): 

69 time_step_index = [time_step_index] 

70 return time_step_index 

71 

72 def boundary(self, boundary: BoundaryCondition, time_step_index=None): 

73 resistors: Resistor = boundary.neighbours 

74 

75 time_step_index = self.parse_time_step_index(time_step_index) 

76 

77 bc_temp = self.rc_solution.input_vectors[time_step_index, boundary.index] 

78 heat_flux_sum = np.zeros_like(bc_temp) 

79 

80 for resistor in resistors: 

81 other_node_index = resistor.get_connected_node(boundary).index 

82 node_temp = self.rc_solution.result_vectors[time_step_index, other_node_index] 

83 heat_flux_sum += 1 / resistor.resistance * (bc_temp - node_temp) 

84 return self.rc_solution.time_steps[time_step_index], heat_flux_sum 

85 

86 def preheat(self, distributor: MassFlowNode, collector: MassFlowNode): 

87 """ 

88 Returns the preheat in Kelvin. 

89 

90 Returns 

91 ------- 

92 np.ndarray 

93 """ 

94 time_step_index = self.parse_time_step_index(None) 

95 temp_dist: np.ndarray = self.rc_solution.result_vectors[time_step_index, distributor.index] 

96 temp_col: np.ndarray = self.rc_solution.result_vectors[time_step_index, collector.index] 

97 return temp_col - temp_dist 

98 

99 def calculate_heat_flux(self, node, resistors: list | Resistor, time_step_index=None) -> np.ndarray | float: 

100 """ 

101 Returns the heat flux going into the node by the passed resistors. 

102 

103 Parameters 

104 ---------- 

105 node : NoteTemplate 

106 The Node of which the heat flux is being calculated. 

107 resistors : list | Resistor 

108 The resistors used to calculate the heat flow. 

109 time_step_index : int | slice | list, optional 

110 The time steps that should be calculated. 

111 If None, all are calculated. 

112 

113 Returns 

114 ------- 

115 float | np.ndarray : 

116 The heat fluxes for every time step. 

117 """ 

118 from pyrc.core.components.resistor import Resistor 

119 

120 if isinstance(resistors, Resistor): 

121 resistors = [resistors] 

122 time_step_index = self.parse_time_step_index(time_step_index) 

123 

124 result = 0 

125 node_temp = self.rc_solution.result_vectors[time_step_index, node.index] 

126 

127 from pyrc.core.components.input import Input 

128 

129 for resistor in resistors: 

130 other_node = resistor.get_connected_node(node) 

131 if isinstance(other_node, Input): 

132 result += ( 

133 1 

134 / resistor.equivalent_resistance 

135 * (self.rc_solution.input_vectors[time_step_index, other_node.index] - node_temp) 

136 ) 

137 else: 

138 result += ( 

139 1 

140 / resistor.equivalent_resistance 

141 * (self.rc_solution.result_vectors[time_step_index, other_node.index] - node_temp) 

142 ) 

143 return result 

144 

145 def internal_heat_source(self, ihc: InternalHeatSource | list, time_step_index=None): 

146 if not isinstance(ihc, list): 

147 ihc = [ihc] 

148 ihc_index = [i.index for i in ihc] 

149 time_step_index = self.parse_time_step_index(time_step_index) 

150 ihc_values = self.rc_solution.input_vectors[time_step_index, :][:, ihc_index] 

151 return self.rc_solution.time_steps[time_step_index], ihc_values 

152 

153 def heat_flux_directions( 

154 self, 

155 nodes: list[Node], 

156 directions: list | np.ndarray = np.array([0, 0, -1]), 

157 except_resistor_types=None, 

158 time_step_index=None, 

159 ) -> tuple | np.ndarray: 

160 """ 

161 Returns the heat flux through the layer in the desired direction of all nodes in the given list. 

162 

163 E.g. if the direction is (0,0,1) the heat flux in positive z direction is calculated. The heat flux is 

164 positive if going in the same direction as the desired one. 

165 

166 Parameters 

167 ---------- 

168 nodes : list[Node] 

169 The Nodes of which the heat flux is being calculated. 

170 directions : list | np.ndarray, optional 

171 The direction(s) in which the heat flux is calculated. 

172 Can be a list then each direction is calculated and returned. 

173 except_resistor_types 

174 time_step_index 

175 

176 Returns 

177 ------- 

178 np.ndarray | tuple : 

179 The result of one direction as np.ndarray or the result of all directions as tuple. 

180 """ 

181 if except_resistor_types is None: 

182 except_resistor_types = [] 

183 if isinstance(directions, np.ndarray): 

184 directions = [directions] 

185 results = [] 

186 for direction in directions: 

187 nodes_and_resistors = [ 

188 (node, node.resistors_in_direction_filtered(direction, except_resistor_types=except_resistor_types)) 

189 for node in nodes 

190 if node is not None 

191 ] 

192 

193 result = 0 

194 for node, resistors in nodes_and_resistors: 

195 result += self.calculate_heat_flux(node, resistors, time_step_index) 

196 results.append(result) 

197 

198 if len(results) > 1: 

199 return tuple(results) 

200 return results[0] 

201 

202 

203def fluxes_data( 

204 boundaries: list = None, 

205 boundaries_sum: list = None, 

206 time_step_index=None, 

207 balance: bool = False, 

208 rc_solution: RCSolution = None, 

209): 

210 if rc_solution is not None: 

211 heat_flux = HeatFlux(rc_solution=rc_solution) 

212 else: 

213 heat_flux = HeatFlux() 

214 

215 results = [] 

216 labels = [] 

217 

218 sum_vector = None 

219 time_steps = None 

220 

221 def add_to_sum(result: np.ndarray, _sum_vector): 

222 if _sum_vector is None: 

223 _sum_vector = np.zeros_like(result) 

224 return _sum_vector + result 

225 

226 if boundaries is not None: 

227 for bc in boundaries: 

228 time_steps, y = heat_flux.boundary(bc, time_step_index) 

229 results.append(y) 

230 labels.append(bc.__class__.__name__) 

231 sum_vector = add_to_sum(y, sum_vector) 

232 if boundaries_sum is not None: 

233 if not isinstance(boundaries_sum[0], list): 

234 boundaries_sum = [boundaries_sum] 

235 for i, bc_groups in enumerate(boundaries_sum): 

236 time_steps, bc_sum = heat_flux.internal_heat_source(bc_groups[0], time_step_index) 

237 for bc in bc_groups[1:]: 

238 _, bc_add_on = heat_flux.internal_heat_source(bc, time_step_index) 

239 bc_sum += bc_add_on 

240 results.append(bc_sum) 

241 labels.append(f"{bc_groups[0].__class__.__name__}s: {i}") 

242 sum_vector = add_to_sum(bc_sum.reshape(-1, 1), sum_vector.reshape(-1, 1)) 

243 if balance and time_steps is not None and sum_vector is not None: 

244 results.append(sum_vector) 

245 labels.append("Balance") 

246 return time_steps, results, labels 

247 

248 

249def plot_channel_balance( 

250 channel_nodes: list[Node] | ChannelNode, 

251 distributor: MassFlowNode, 

252 collector: MassFlowNode, 

253 time_step_index=None, 

254 start_date=datetime(2023, 1, 1), 

255 y_scale=1, 

256): 

257 """ 

258 Plots the balance of the given channel nodes. 

259 

260 Parameters 

261 ---------- 

262 channel_nodes : list[ChannelNode] | ChannelNode 

263 The channel nodes to plot the sum of. 

264 distributor : MassFlowNode 

265 The distributor before the channel nodes. 

266 Used to calculate the heat flux that goes into the mass flow within the channel nodes. 

267 collector : MassFlowNode 

268 The collector after the channel nodes. 

269 Used to calculate the heat flux that goes into the mass flow within the channel nodes. 

270 time_step_index : int | slice | list, optional 

271 The time steps that should be calculated. 

272 

273 """ 

274 from pyrc.core.nodes import ChannelNode 

275 

276 if isinstance(channel_nodes, ChannelNode): 

277 channel_nodes = [channel_nodes] 

278 heat_flux = HeatFlux() 

279 time_step_index = heat_flux.parse_time_step_index(time_step_index) 

280 exterior_direction = np.array([0, 0, -1]) 

281 interior_direction = np.array([0, 0, 1]) 

282 upper_direction = np.array([0, -1, 0]) 

283 lower_direction = np.array([0, 1, 0]) 

284 ex, interior, up, lo = heat_flux.heat_flux_directions( 

285 channel_nodes, 

286 directions=[exterior_direction, interior_direction, upper_direction, lower_direction], 

287 time_step_index=time_step_index, 

288 ) 

289 # side_heat_flux_dist = heat_flux.heat_flux_directions([distributor], 

290 # directions=[np.array([1, 0, 0])], 

291 # time_step_index=time_step_index, 

292 # except_resistor_types=[MassTransport] 

293 # ) 

294 # side_heat_flux_col = heat_flux.heat_flux_directions([collector], 

295 # directions=[np.array([-1, 0, 0])], 

296 # time_step_index=time_step_index, 

297 # except_resistor_types=[MassTransport] 

298 # ) 

299 temp_dist = heat_flux.rc_solution.result_vectors[time_step_index, distributor.index] 

300 temp_col = heat_flux.rc_solution.result_vectors[time_step_index, collector.index] 

301 mass_flow = distributor.mass_flow 

302 spec_capacity = distributor.material.heat_capacity 

303 channel_heat_flux = mass_flow * spec_capacity * (temp_col - temp_dist) 

304 

305 x = heat_flux.rc_solution.time_steps[time_step_index] 

306 x = seconds_to_dates(x, start_date) 

307 

308 time_plot = TimePlot( 

309 x=x, 

310 ys=[ex, interior, (up + lo)], 

311 labels=["exterior", "interior", "inbetween"], 

312 y_scale=y_scale, 

313 y_title="Heat Flux / W", 

314 ) 

315 time_plot.plot_stack() 

316 time_plot.ax.plot( 

317 time_plot.x, 

318 channel_heat_flux * time_plot.y_scale, 

319 label=["channel"], 

320 color="black", 

321 linewidth=time_plot.line_width, 

322 ) 

323 time_plot.format() 

324 time_plot.show() 

325 

326 

327def plot_fluxes_accumulated( 

328 boundaries: list = None, boundaries_sum: list = None, time_step_index=None, plot_balance: bool = False 

329): 

330 x, ys, labels = fluxes_data(boundaries, boundaries_sum, time_step_index, plot_balance) 

331 

332 fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True) 

333 

334 for i, y in enumerate(ys): 

335 if labels[i] == "Balance": 

336 pass 

337 ax1.plot(x, y, labels[i]) 

338 

339 fig.tight_layout() 

340 ax1.set_xlabel("Time") 

341 ax1.set_ylabel("Positive Heat flux / W") 

342 ax2.set_ylabel("Negative Heat flux / W") 

343 for ax in (ax1, ax2): 

344 ax.grid(True) 

345 ax.legend() 

346 

347 plt.show()