Coverage for pyrc \ core \ simulation.py: 13%
148 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-01 13:11 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-01 13:11 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8import multiprocessing as mp
9import os
10import time
11from copy import copy
12from typing import Any, Callable
14from pyrc.core.network import RCNetwork
15from pyrc.core.settings import initial_settings as i_settings, Settings
16from pyrc.core.components.templates import EquationItem, RCObjects, RCSolution
18from pyrc.tools.functions import add_leading_underscore, subtract_seconds_from_string
21class Simulation:
22 """
23 Handle one RC network simulation including pre-simulation.
25 The pre-simulation determines correct initial values for the network by calculating a time range before the real
26 simulation. The pre-simulation can be done with varying weather data or just static simulation of the initial
27 boundary conditions of the network. But because of the time dependent values of all capacities it is recommended to
28 use the option with varying (realistic) boundary data.
30 The pre-simulation is saved as single initial values that can be loaded in. It is executed only once and then loaded
31 in from file.
33 """
35 def __init__(
36 self,
37 network_class: type(RCNetwork) = None,
38 network_keyword_arguments: dict[str, Any] = None,
39 pre_calculation_seconds: int | float = 36000,
40 t_span: tuple = None,
41 name_add_on: str = None,
42 settings: Settings = None,
43 pre_calculation_settings: Settings = None,
44 print_progress: bool = True,
45 time_dependent_function: Callable | Any = None,
46 ) -> None:
47 self.network_class = network_class
48 self.network_keyword_arguments = network_keyword_arguments
49 self.pre_calculation_seconds: int | float = pre_calculation_seconds
50 self.t_span: tuple = t_span
51 self.name_add_on: str = name_add_on
52 self.settings: Settings = settings
53 self.pre_calculation_settings: Settings = pre_calculation_settings
54 self.print_progress: bool = print_progress
55 self.time_dependent_function: Callable = time_dependent_function
57 self.network = None
59 def run(
60 self,
61 network_class: type(RCNetwork) = None,
62 network_keyword_arguments: dict[str, Any] = None,
63 t_span: tuple = None,
64 name_add_on: str = None,
65 settings: Settings = None,
66 pre_calculation_settings: Settings = None,
67 print_progress: bool = None,
68 time_dependent_function: Callable | Any = None,
69 ):
70 """
71 Run the simulation including pre-simulation with the passed network type.
73 Parameters
74 ----------
75 network_class : type(RCNetwork)
76 The RCNetwork that is created for the simulation.
77 network_keyword_arguments : dict[str, Any]
78 The keyword arguments for the RCNetwork object that is created.
79 t_span : tuple
80 The t_span for the simulation: (start, end) in seconds.
81 It should start at 0 (otherwise it can work but it's not tested).
82 name_add_on : str, optional
83 An add-on for the name to identify the name to the worker.
84 If None, a random five digits integer is used (with leading zeros).
85 settings : Settings, optional
86 The settings for the network and simulation.
87 If None, the initial settings dict from the network is used.
88 pre_calculation_settings : Settings, optional
89 The settings for the pre-calculation. Should only vary in the weather data start date.
90 If None, the initial settings dict from the network is used, but with static calculation.
91 print_progress : bool, optional
92 Whether to print some progress information during the simulation.
93 time_dependent_function : Callable, optional
94 A function that calculates all time dependent variables within the time step and returns them in the same order as
95 self.get_time_dependent_symbols().
96 It gets parameters like this:\n
97 ``time_dependent_function(time, temperature_vector)`` \n
98 or\n
99 ``time_dependent_function(time, temperature_vector, input_vector)``\\.\n
100 This function is required if time dependent symbols exist.
101 It must return an iterable (e.g. list).
102 To not run into Errors just use ``*args``\\, ``**kwargs`` at the end in case more values are passed then
103 needed.
104 """
105 self.network = self._worker(
106 network_class=network_class or self.network_class,
107 network_copy_dict=network_keyword_arguments or self.network_keyword_arguments or {},
108 pre_calculation_seconds=self.pre_calculation_seconds,
109 t_span_simulation=t_span or self.t_span,
110 name_add_on=name_add_on or self.name_add_on,
111 settings=settings or self.settings,
112 pre_calculation_settings=pre_calculation_settings or self.pre_calculation_settings,
113 print_progress=print_progress if print_progress is not None else self.print_progress,
114 return_network=True,
115 time_dependent_function=time_dependent_function or self.time_dependent_function,
116 )
118 @staticmethod
119 def _worker(
120 network_class: type(RCNetwork),
121 network_copy_dict: dict,
122 pre_calculation_seconds: int | float,
123 t_span_simulation: tuple,
124 name_add_on: str = "",
125 settings: Settings = None,
126 pre_calculation_settings: Settings = None,
127 print_progress: bool = True,
128 return_network: bool = False,
129 time_dependent_function: Callable | Any = None,
130 ) -> RCNetwork or None:
131 """
132 Runs a single simulation. Only pass a copy of network!
134 Remember: The network shouldn't exist / built yet because it's not pickable. The network is created newly and if
135 the matrices are already created they are loaded from file.
136 Not only the unpickable state of the network forces the creation of the network within this method but also the
137 network dependency on both rc_objects and rc_solution objects.
139 Parameters
140 ----------
141 network_class : type(RCNetwork)
142 The RCNetwork that is created for the simulation.
143 network_copy_dict : dict
144 A dictionary to give RCNetwork as keyword arguments for initializing.
145 This bypasses to move the unpickable RCNetwork object to the worker and instead creates it inside the
146 worker.
147 pre_calculation_seconds : int | float
148 See main class.
149 t_span_simulation : tuple
150 The t_span for the simulation: (start, end) in seconds.
151 It should start at 0 (otherwise it can work but it's not tested).
152 name_add_on : str, optional
153 An add-on for the name to identify the name to the worker.
154 settings : Settings, optional
155 The settings for the network and simulation.
156 If None, the initial settings dict from the network is used.
157 pre_calculation_settings : Settings, optional
158 The settings for the pre-calculation. Should only vary in the weather data start date.
159 If None, the initial settings dict from the network is used, but with static calculation or a shifted
160 weather start date, if use_weather_data.
161 """
162 # Just to be safe: Create new RCObjects and RCSolution instances that are not linked to other simulations.
163 rc_objects = RCObjects()
164 network_copy_dict.update(
165 {
166 "rc_objects": rc_objects,
167 "rc_solution": RCSolution(rc_objects=rc_objects),
168 }
169 )
170 network: RCNetwork = network_class(**network_copy_dict)
171 if settings is not None:
172 network.settings = copy(settings)
173 else:
174 settings = copy(network.settings)
175 network.create_network()
177 name_add_on = add_leading_underscore(name_add_on)
179 name_prefix = os.path.join(network.settings.save_folder_path, f"{network.hash}{name_add_on}")
180 single_solution_name = f"{name_prefix}_{pre_calculation_seconds}_single_solution.pickle"
182 if not network.load_initial_values(return_bool=True, pickle_path_single_solution=single_solution_name):
183 print(f"{network.hash}: starting pre-calculation")
184 static_time_dependent_function: Callable = time_dependent_function
185 if pre_calculation_settings is not None:
186 network.settings = pre_calculation_settings
187 else:
188 if network.settings.use_weather_data:
189 original_date = network.settings.start_date
190 network.settings.start_date = subtract_seconds_from_string(original_date, pre_calculation_seconds)
191 network.settings.calculate_static = False
192 print(f"Dynamic pre-calculation with weather start date: {network.settings.start_date}")
193 else:
194 print("Static pre-calculation.")
195 network.settings.calculate_static = True
196 if network.settings.calculate_static and time_dependent_function is not None:
197 # always use the first value for the static calculation
198 static_time_dependent_function = lambda t, temp_vector, *args, **kwargs: time_dependent_function(
199 0, temp_vector, *args, **kwargs
200 )
201 t_span = (0, pre_calculation_seconds)
202 network.solve_network(
203 t_span,
204 print_progress=print_progress,
205 name_add_on=name_add_on + "_pre_calculation",
206 time_dependent_function=static_time_dependent_function,
207 )
209 # save last solution to load it back in later
210 network.rc_solution.save_last_step(single_solution_name)
212 # delete all static solutions in the solutions object and free space for dynamic solution
213 network.rc_solution.delete_solutions(confirm=True)
214 network.reset_properties()
216 assert network.load_initial_values(return_bool=True, pickle_path_single_solution=single_solution_name)
217 network.settings = settings # change back to original settings
218 print(f"{network.hash}: pre-calculation done.")
219 else:
220 print(f"{network.hash}: pre-calculation was loaded from file.")
222 t_span = t_span_simulation
223 network.solve_network(
224 t_span,
225 print_progress=print_progress,
226 name_add_on=name_add_on,
227 time_dependent_function=time_dependent_function,
228 )
229 # result is saved in network.solve_network so it doesn't need to be saved in here.
230 # file_path = f"{name_prefix}_result.pickle"
231 # network.rc_solution.save_solution(file_path)
233 if return_network:
234 return network
237class Parameterization(Simulation):
238 """
239 Class to handle `RCNetwork` calculations that are quite similar but differ in one settings parameter.
241 All calculations are run in parallel.
242 """
244 def __init__(
245 self,
246 parameters_tuples: list[tuple],
247 pre_calculation_seconds=36000,
248 initial_settings_dict: dict = i_settings,
249 max_core_number=0,
250 t_span=None,
251 ):
252 """
254 Parameters
255 ----------
256 parameters_tuples : list[tuple]
257 Defining the parameters for each simulation:
258 for parameters in parameters_tuples:
259 network_type: type = parameters[0]
260 settings_dict = parameters[1]
261 network_parameters = parameters[2]
262 t_span = parameters[3]
263 name_add_on = parameters[4]
264 pre_calculation_seconds : int | float, optional
265 How long the static calculation before the dynamic calculation should be.
266 initial_settings_dict : dict, optional
267 The initial settings to use for the calculations. If not given, the initial ones from ``core.settings`` is
268 used.
269 max_core_number : int, optional
270 An optional limit how many cores can be used for the calculations.
271 0 for no limit.
272 """
273 super().__init__(pre_calculation_seconds=pre_calculation_seconds, settings=initial_settings_dict)
274 self.parameters_tuples: list[tuple] = parameters_tuples
276 if t_span is None:
277 t_span = (0, 8760 * 3600)
278 self.t_span = t_span
280 if max_core_number <= 0:
281 max_core_number = mp.cpu_count()
282 self.max_core_number: int = min(max_core_number, mp.cpu_count())
284 def get_parameters(self, parameters: tuple):
285 """
286 Returns the first few parameters for the worker method.
288 Parameters
289 ----------
290 parameters : tuple
291 The tuple out of self.parameters_tuples
293 Returns
294 -------
295 tuple :
296 The parameters as tuple.
297 """
298 network_type: type = parameters[0]
299 settings_dict = parameters[1]
300 network_parameters = parameters[2]
301 t_span = parameters[3]
302 name_add_on = parameters[4]
304 settings = Settings(**settings_dict)
305 network_parameters.update(
306 {
307 "settings": settings,
308 "load_from_pickle": True,
309 "save_to_pickle": True,
310 "num_cores_jacobian": 1,
311 "rc_objects": RCObjects(),
312 "rc_solution": RCSolution(),
313 }
314 )
316 pre_settings_dict = settings_dict.copy()
317 try:
318 original_date = settings_dict["start_date"]
319 except KeyError:
320 original_date = "2022-01-01T00:00:00"
321 pre_settings_dict.update(
322 {"start_date": subtract_seconds_from_string(original_date, self.pre_calculation_seconds)}
323 )
324 pre_settings = Settings(**pre_settings_dict)
326 return (
327 network_type,
328 network_parameters,
329 self.pre_calculation_seconds,
330 t_span,
331 name_add_on,
332 settings,
333 pre_settings,
334 False,
335 )
337 def pre_create_jacobians(self):
338 """
339 Pre-creates the jacobian matrices of all RCNetworks if not already found in pickle file.
341 This is useful to quickly generate all jacobian matrices outside the workers so that they only need to load
342 the matrices and lambdify them.
344 Returns
345 -------
347 """
348 for parameters in self.parameters_tuples:
349 # Reset class attribute from Equation item to get the same hash values for equal networks
350 EquationItem.item_counter = 0
351 params = self.get_parameters(parameters)
352 network_type, network_parameters = params[:2]
354 network_parameters = network_parameters.copy()
355 network_parameters.update(
356 {
357 "load_from_pickle": True,
358 "save_to_pickle": True,
359 "num_cores_jacobian": self.max_core_number,
360 "rc_objects": RCObjects(),
361 "rc_solution": RCSolution(),
362 }
363 )
364 # {"load_from_pickle": True, "save_to_pickle": True, "num_cores_jacobian": 1}) # for debugging
365 network: RCNetwork = network_type(**network_parameters)
367 network.create_network()
368 network.make_system_matrices()
369 print(f"{params[5]}: Jacobi pre-calculation done for hash: {network.hash}")
371 def run(self):
372 # first create the jacobian matrices for all networks using all cores
373 self.pre_create_jacobians()
375 if self.max_core_number == 1:
376 print("Using single core calculation.")
377 for parameters in self.parameters_tuples:
378 worker_parameters = self.get_parameters(parameters)
379 # check name_add_on
380 if worker_parameters[5] == "" or worker_parameters[5] is None:
381 worker_parameters = (*worker_parameters[:5], f"1", *worker_parameters[6:])
382 self._worker(*worker_parameters)
383 print(f"Starting process: {worker_parameters[5]}")
384 else:
385 # create single processes that run the parameter simulations
386 active = []
387 process_infos = []
388 process_id = 0
389 for parameters in self.parameters_tuples:
390 # Wait until a slot is free
391 while len(active) >= self.max_core_number:
392 skip_waiting = False
393 for idx, p in enumerate(active):
394 if not p.is_alive():
395 p.join()
396 proc_id, name_addon, p_name = process_infos[idx]
397 print(f"Process {proc_id} with name AddOn '{name_addon}' finished. Proc name {p_name}")
398 skip_waiting = True
399 if not skip_waiting:
400 time.sleep(0.1) # small delay to avoid busy-waiting
401 mask = [p.is_alive() for p in active]
402 active = [p for p, alive in zip(active, mask) if alive]
403 process_infos = [info for info, alive in zip(process_infos, mask) if alive]
405 worker_parameters = self.get_parameters(parameters)
406 # check name_add_on
407 if worker_parameters[5] == "" or worker_parameters[5] is None:
408 worker_parameters = (*worker_parameters[:5], f"{process_id}", *worker_parameters[6:])
409 process = mp.Process(target=self._worker, args=worker_parameters)
410 print(f"Starting process {process_id} - name AddOn: {worker_parameters[5]} - Proc name {process.name}")
411 process.start()
412 active.append(process)
413 process_infos.append((process_id, worker_parameters[5], process.name))
414 process_id += 1
416 while active:
417 skip_waiting = False
418 for idx, p in enumerate(active):
419 if not p.is_alive():
420 p.join()
421 proc_id, name_addon, p_name = process_infos[idx]
422 print(f"Process {proc_id} (name: {p_name}) with name AddOn '{name_addon}' finished.")
423 skip_waiting = True
424 if not skip_waiting:
425 time.sleep(0.1)
426 active_infos = [(p, info) for p, info in zip(active, process_infos) if p.is_alive()]
427 active, process_infos = zip(*active_infos) if active_infos else ([], [])
429 print("All processes finished.")