Coverage for pyrc \ core \ simulation.py: 13%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8import multiprocessing as mp 

9import os 

10import time 

11from copy import copy 

12from typing import Any, Callable 

13 

14from pyrc.core.network import RCNetwork 

15from pyrc.core.settings import initial_settings as i_settings, Settings 

16from pyrc.core.components.templates import EquationItem, RCObjects, RCSolution 

17 

18from pyrc.tools.functions import add_leading_underscore, subtract_seconds_from_string 

19 

20 

21class Simulation: 

22 """ 

23 Handle one RC network simulation including pre-simulation. 

24 

25 The pre-simulation determines correct initial values for the network by calculating a time range before the real 

26 simulation. The pre-simulation can be done with varying weather data or just static simulation of the initial 

27 boundary conditions of the network. But because of the time dependent values of all capacities it is recommended to 

28 use the option with varying (realistic) boundary data. 

29 

30 The pre-simulation is saved as single initial values that can be loaded in. It is executed only once and then loaded 

31 in from file. 

32 

33 """ 

34 

35 def __init__( 

36 self, 

37 network_class: type(RCNetwork) = None, 

38 network_keyword_arguments: dict[str, Any] = None, 

39 pre_calculation_seconds: int | float = 36000, 

40 t_span: tuple = None, 

41 name_add_on: str = None, 

42 settings: Settings = None, 

43 pre_calculation_settings: Settings = None, 

44 print_progress: bool = True, 

45 time_dependent_function: Callable | Any = None, 

46 ) -> None: 

47 self.network_class = network_class 

48 self.network_keyword_arguments = network_keyword_arguments 

49 self.pre_calculation_seconds: int | float = pre_calculation_seconds 

50 self.t_span: tuple = t_span 

51 self.name_add_on: str = name_add_on 

52 self.settings: Settings = settings 

53 self.pre_calculation_settings: Settings = pre_calculation_settings 

54 self.print_progress: bool = print_progress 

55 self.time_dependent_function: Callable = time_dependent_function 

56 

57 self.network = None 

58 

59 def run( 

60 self, 

61 network_class: type(RCNetwork) = None, 

62 network_keyword_arguments: dict[str, Any] = None, 

63 t_span: tuple = None, 

64 name_add_on: str = None, 

65 settings: Settings = None, 

66 pre_calculation_settings: Settings = None, 

67 print_progress: bool = None, 

68 time_dependent_function: Callable | Any = None, 

69 ): 

70 """ 

71 Run the simulation including pre-simulation with the passed network type. 

72 

73 Parameters 

74 ---------- 

75 network_class : type(RCNetwork) 

76 The RCNetwork that is created for the simulation. 

77 network_keyword_arguments : dict[str, Any] 

78 The keyword arguments for the RCNetwork object that is created. 

79 t_span : tuple 

80 The t_span for the simulation: (start, end) in seconds. 

81 It should start at 0 (otherwise it can work but it's not tested). 

82 name_add_on : str, optional 

83 An add-on for the name to identify the name to the worker. 

84 If None, a random five digits integer is used (with leading zeros). 

85 settings : Settings, optional 

86 The settings for the network and simulation. 

87 If None, the initial settings dict from the network is used. 

88 pre_calculation_settings : Settings, optional 

89 The settings for the pre-calculation. Should only vary in the weather data start date. 

90 If None, the initial settings dict from the network is used, but with static calculation. 

91 print_progress : bool, optional 

92 Whether to print some progress information during the simulation. 

93 """ 

94 self.network = self._worker( 

95 network_class=network_class or self.network_class, 

96 network_copy_dict=network_keyword_arguments or self.network_keyword_arguments or {}, 

97 pre_calculation_seconds=self.pre_calculation_seconds, 

98 t_span_simulation=t_span or self.t_span, 

99 name_add_on=name_add_on or self.name_add_on, 

100 settings=settings or self.settings, 

101 pre_calculation_settings=pre_calculation_settings or self.pre_calculation_settings, 

102 print_progress=print_progress if print_progress is not None else self.print_progress, 

103 return_network=True, 

104 time_dependent_function=time_dependent_function or self.time_dependent_function, 

105 ) 

106 

107 @staticmethod 

108 def _worker( 

109 network_class: type(RCNetwork), 

110 network_copy_dict: dict, 

111 pre_calculation_seconds: int | float, 

112 t_span_simulation: tuple, 

113 name_add_on: str = None, 

114 settings: Settings = None, 

115 pre_calculation_settings: Settings = None, 

116 print_progress: bool = True, 

117 return_network: bool = False, 

118 time_dependent_function: Callable | Any = None, 

119 ) -> RCNetwork or None: 

120 """ 

121 Runs a single simulation. Only pass a copy of network! 

122 

123 Remember: The network shouldn't exist / built yet because it's not pickable. The network is created newly and if 

124 the matrices are already created they are loaded from file. 

125 Not only the unpickable state of the network forces the creation of the network within this method but also the 

126 network dependency on both rc_objects and rc_solution objects. 

127 

128 Parameters 

129 ---------- 

130 network_class : type(RCNetwork) 

131 The RCNetwork that is created for the simulation. 

132 network_copy_dict : dict 

133 A dictionary to give RCNetwork as keyword arguments for initializing. 

134 This bypasses to move the unpickable RCNetwork object to the worker and instead creates it inside the 

135 worker. 

136 pre_calculation_seconds : int | float 

137 See main class. 

138 t_span_simulation : tuple 

139 The t_span for the simulation: (start, end) in seconds. 

140 It should start at 0 (otherwise it can work but it's not tested). 

141 name_add_on : str, optional 

142 An add-on for the name to identify the name to the worker. 

143 settings : Settings, optional 

144 The settings for the network and simulation. 

145 If None, the initial settings dict from the network is used. 

146 pre_calculation_settings : Settings, optional 

147 The settings for the pre-calculation. Should only vary in the weather data start date. 

148 If None, the initial settings dict from the network is used, but with static calculation or a shifted 

149 weather start date, if use_weather_data. 

150 """ 

151 # Just to be safe: Create new RCObjects and RCSolution instances that are not linked to other simulations. 

152 rc_objects = RCObjects() 

153 network_copy_dict.update( 

154 { 

155 "rc_objects": rc_objects, 

156 "rc_solution": RCSolution(rc_objects=rc_objects), 

157 } 

158 ) 

159 network: RCNetwork = network_class(**network_copy_dict) 

160 if settings is not None: 

161 network.settings = copy(settings) 

162 else: 

163 settings = copy(network.settings) 

164 network.create_network() 

165 test = network.groups["inner"][13].resistors_in_direction((1, 0, 0))[0].resistance 

166 

167 name_add_on = add_leading_underscore(name_add_on) 

168 

169 name_prefix = os.path.join(network.settings.save_folder_path, f"{network.hash}{name_add_on}") 

170 single_solution_name = f"{name_prefix}_{pre_calculation_seconds}_single_solution.pickle" 

171 

172 if not network.load_initial_values(return_bool=True, pickle_path_single_solution=single_solution_name): 

173 print(f"{network.hash}: starting pre-calculation") 

174 if pre_calculation_settings is not None: 

175 network.settings = pre_calculation_settings 

176 else: 

177 if network.settings.use_weather_data: 

178 original_date = network.settings.start_date 

179 network.settings.start_date = subtract_seconds_from_string(original_date, pre_calculation_seconds) 

180 network.settings.calculate_static = False 

181 print(f"Dynamic pre-calculation with weather start date: {network.settings.start_date}") 

182 else: 

183 print("Static pre-calculation.") 

184 network.settings.calculate_static = True 

185 t_span = (0, pre_calculation_seconds) 

186 network.solve_network( 

187 t_span, 

188 print_progress=print_progress, 

189 name_add_on=name_add_on + "_pre_calculation", 

190 time_dependent_function=time_dependent_function, 

191 ) 

192 

193 # save last solution to load it back in later 

194 network.rc_solution.save_last_step(single_solution_name) 

195 

196 # delete all static solutions in the solutions object and free space for dynamic solution 

197 network.rc_solution.delete_solutions(confirm=True) 

198 network.reset_properties() 

199 

200 assert network.load_initial_values(return_bool=True, pickle_path_single_solution=single_solution_name) 

201 network.settings = settings # change back to original settings 

202 print(f"{network.hash}: pre-calculation done.") 

203 else: 

204 print(f"{network.hash}: pre-calculation was loaded from file.") 

205 

206 t_span = t_span_simulation 

207 network.solve_network( 

208 t_span, 

209 print_progress=print_progress, 

210 name_add_on=name_add_on, 

211 time_dependent_function=time_dependent_function, 

212 ) 

213 # result is saved in network.solve_network so it doesn't need to be saved in here. 

214 # file_path = f"{name_prefix}_result.pickle" 

215 # network.rc_solution.save_solution(file_path) 

216 

217 if return_network: 

218 return network 

219 

220 

221class Parameterization(Simulation): 

222 """ 

223 Class to handle `RCNetwork` calculations that are quite similar but differ in one settings parameter. 

224 

225 All calculations are run in parallel. 

226 """ 

227 

228 def __init__( 

229 self, 

230 parameters_tuples: list[tuple], 

231 pre_calculation_seconds=36000, 

232 initial_settings_dict: dict = i_settings, 

233 max_core_number=0, 

234 t_span=None, 

235 ): 

236 """ 

237 

238 Parameters 

239 ---------- 

240 parameters_tuples : list[tuple] 

241 Defining the parameters for each simulation: 

242 for parameters in parameters_tuples: 

243 network_type: type = parameters[0] 

244 settings_dict = parameters[1] 

245 network_parameters = parameters[2] 

246 t_span = parameters[3] 

247 name_add_on = parameters[4] 

248 pre_calculation_seconds : int | float, optional 

249 How long the static calculation before the dynamic calculation should be. 

250 initial_settings_dict : dict, optional 

251 The initial settings to use for the calculations. If not given, the initial ones from ``core.settings`` is 

252 used. 

253 max_core_number : int, optional 

254 An optional limit how many cores can be used for the calculations. 

255 0 for no limit. 

256 """ 

257 super().__init__(pre_calculation_seconds=pre_calculation_seconds, settings=initial_settings_dict) 

258 self.parameters_tuples: list[tuple] = parameters_tuples 

259 

260 if t_span is None: 

261 t_span = (0, 8760 * 3600) 

262 self.t_span = t_span 

263 

264 if max_core_number <= 0: 

265 max_core_number = mp.cpu_count() 

266 self.max_core_number: int = min(max_core_number, mp.cpu_count()) 

267 

268 def get_parameters(self, parameters: tuple): 

269 """ 

270 Returns the first few parameters for the worker method. 

271 

272 Parameters 

273 ---------- 

274 parameters : tuple 

275 The tuple out of self.parameters_tuples 

276 

277 Returns 

278 ------- 

279 tuple : 

280 The parameters as tuple. 

281 """ 

282 network_type: type = parameters[0] 

283 settings_dict = parameters[1] 

284 network_parameters = parameters[2] 

285 t_span = parameters[3] 

286 name_add_on = parameters[4] 

287 

288 settings = Settings(**settings_dict) 

289 network_parameters.update( 

290 { 

291 "settings": settings, 

292 "load_from_pickle": True, 

293 "save_to_pickle": True, 

294 "num_cores_jacobian": 1, 

295 "rc_objects": RCObjects(), 

296 "rc_solution": RCSolution(), 

297 } 

298 ) 

299 

300 pre_settings_dict = settings_dict.copy() 

301 try: 

302 original_date = settings_dict["start_date"] 

303 except KeyError: 

304 original_date = "2022-01-01T00:00:00" 

305 pre_settings_dict.update( 

306 {"start_date": subtract_seconds_from_string(original_date, self.pre_calculation_seconds)} 

307 ) 

308 pre_settings = Settings(**pre_settings_dict) 

309 

310 return ( 

311 network_type, 

312 network_parameters, 

313 self.pre_calculation_seconds, 

314 t_span, 

315 name_add_on, 

316 settings, 

317 pre_settings, 

318 False, 

319 ) 

320 

321 def pre_create_jacobians(self): 

322 """ 

323 Pre-creates the jacobian matrices of all RCNetworks if not already found in pickle file. 

324 

325 This is useful to quickly generate all jacobian matrices outside the workers so that they only need to load 

326 the matrices and lambdify them. 

327 

328 Returns 

329 ------- 

330 

331 """ 

332 for parameters in self.parameters_tuples: 

333 # Reset class attribute from Equation item to get the same hash values for equal networks 

334 EquationItem.item_counter = 0 

335 params = self.get_parameters(parameters) 

336 network_type, network_parameters = params[:2] 

337 

338 network_parameters = network_parameters.copy() 

339 network_parameters.update( 

340 { 

341 "load_from_pickle": True, 

342 "save_to_pickle": True, 

343 "num_cores_jacobian": self.max_core_number, 

344 "rc_objects": RCObjects(), 

345 "rc_solution": RCSolution(), 

346 } 

347 ) 

348 # {"load_from_pickle": True, "save_to_pickle": True, "num_cores_jacobian": 1}) # for debugging 

349 network: RCNetwork = network_type(**network_parameters) 

350 

351 network.create_network() 

352 network.make_system_matrices() 

353 print(f"{params[5]}: Jacobi pre-calculation done for hash: {network.hash}") 

354 

355 def run(self): 

356 # first create the jacobian matrices for all networks using all cores 

357 self.pre_create_jacobians() 

358 

359 if self.max_core_number == 1: 

360 print("Using single core calculation.") 

361 for parameters in self.parameters_tuples: 

362 worker_parameters = self.get_parameters(parameters) 

363 # check name_add_on 

364 if worker_parameters[5] == "" or worker_parameters[5] is None: 

365 worker_parameters = (*worker_parameters[:5], f"1", *worker_parameters[6:]) 

366 self._worker(*worker_parameters) 

367 print(f"Starting process: {worker_parameters[5]}") 

368 else: 

369 # create single processes that run the parameter simulations 

370 active = [] 

371 process_infos = [] 

372 process_id = 0 

373 for parameters in self.parameters_tuples: 

374 # Wait until a slot is free 

375 while len(active) >= self.max_core_number: 

376 skip_waiting = False 

377 for idx, p in enumerate(active): 

378 if not p.is_alive(): 

379 p.join() 

380 proc_id, name_addon, p_name = process_infos[idx] 

381 print(f"Process {proc_id} with name AddOn '{name_addon}' finished. Proc name {p_name}") 

382 skip_waiting = True 

383 if not skip_waiting: 

384 time.sleep(0.1) # small delay to avoid busy-waiting 

385 mask = [p.is_alive() for p in active] 

386 active = [p for p, alive in zip(active, mask) if alive] 

387 process_infos = [info for info, alive in zip(process_infos, mask) if alive] 

388 

389 worker_parameters = self.get_parameters(parameters) 

390 # check name_add_on 

391 if worker_parameters[5] == "" or worker_parameters[5] is None: 

392 worker_parameters = (*worker_parameters[:5], f"{process_id}", *worker_parameters[6:]) 

393 process = mp.Process(target=self._worker, args=worker_parameters) 

394 print(f"Starting process {process_id} - name AddOn: {worker_parameters[5]} - Proc name {process.name}") 

395 process.start() 

396 active.append(process) 

397 process_infos.append((process_id, worker_parameters[5], process.name)) 

398 process_id += 1 

399 

400 while active: 

401 skip_waiting = False 

402 for idx, p in enumerate(active): 

403 if not p.is_alive(): 

404 p.join() 

405 proc_id, name_addon, p_name = process_infos[idx] 

406 print(f"Process {proc_id} (name: {p_name}) with name AddOn '{name_addon}' finished.") 

407 skip_waiting = True 

408 if not skip_waiting: 

409 time.sleep(0.1) 

410 active_infos = [(p, info) for p, info in zip(active, process_infos) if p.is_alive()] 

411 active, process_infos = zip(*active_infos) if active_infos else ([], []) 

412 

413 print("All processes finished.")