Coverage for pyrc \ dataHandler \ weather.py: 29%
111 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8import os.path
9from collections.abc import Callable
10from datetime import datetime
12import numpy as np
13import pandas as pd
14from scipy.interpolate import PchipInterpolator
16from pyrc.core.wall import Wall
19class WeatherData:
20 def __init__(
21 self,
22 path,
23 start_time: str | datetime = "2015-01-01T01:00",
24 ):
25 self.path = os.path.normpath(path)
26 if not isinstance(start_time, datetime):
27 start_time = datetime.fromisoformat(start_time)
28 self.start_time: datetime = start_time
29 self.datetime_col_index = 0
30 self.edge_points = 5
31 self.columns = [
32 "temp", # 0
33 "pressure", # 1
34 "wind direction", # 2
35 "wind speed", # 3
36 "cloudiness", # 4
37 "moisture", # 5
38 "relative_humidity", # 6
39 "direct solar radiation", # 7
40 "diffuse solar radiation", # 8
41 "atmospheric heat radiation", # 9
42 "terrestrial heat radiation", # 10
43 ]
45 self.__period = None
46 self.__interpolators = None
48 def __copy__(self):
49 return WeatherData(
50 path=self.path,
51 start_time=datetime.fromisoformat(self.start_time.isoformat()),
52 )
54 def __deepcopy__(self):
55 return self.__copy__()
57 @property
58 def period(self):
59 if self.__period is None:
60 _, self.__period = self._make_interpolators()
61 return self.__period
63 @property
64 def interpolators(self) -> list[PchipInterpolator]:
65 if self.__interpolators is None:
66 self.__interpolators, self.__period = self._make_interpolators()
67 return self.__interpolators
69 def get_raw_data(self) -> np.ndarray:
70 df = pd.read_csv(
71 self.path,
72 skiprows=34,
73 sep="\s+",
74 header=None,
75 names=["RW", "HW", "MM", "DD", "HH", "T", "P", "WR", "WG", "N", "x", "RF", "B", "D", "A", "E", "IL"],
76 )
78 df["datetime"] = pd.to_datetime(dict(year=2015, month=df["MM"], day=df["DD"], hour=df["HH"]))
79 df.drop(columns=["RW", "HW", "IL", "MM", "DD", "HH"], inplace=True)
81 df["T"] += 273.15 # °C to K
82 df["P"] *= 100 # hPa to Pa
83 df["N"] /= 8.0 # 1/8 to 0–1
84 df["x"] /= 1000.0 # g/kg to kg/kg
86 cols = ["datetime"] + [c for c in df.columns if c != "datetime"]
87 df = df[cols]
89 result = df.to_numpy()
90 result[:, 0] = result[:, 0].astype("datetime64[s]")
92 return result
94 def get_interpolator_values(self) -> tuple:
96 # rearrange matrix so that it starts at start_time
97 seconds, data = self.reorder_matrix_by_time()
98 columns = np.arange(1, data.shape[1]) # skip datetime column by default
99 values = data[:, columns]
101 # edge padding using correct slices
102 period = seconds[-1] - seconds[0] + (seconds[1] - seconds[0])
103 left = np.arange(-self.edge_points, 0)
104 right = np.arange(0, self.edge_points)
105 extended_seconds = np.concatenate([seconds[left] - period, seconds, seconds[right] + period])
106 extended_values = np.concatenate([values[left], values, values[right]])
107 return extended_seconds, extended_values, period
109 def _make_interpolators(self):
110 extended_seconds, extended_values, period = self.get_interpolator_values()
112 interpolators = [
113 PchipInterpolator(extended_seconds, extended_values[:, i], extrapolate=False)
114 for i in range(extended_values.shape[1])
115 ]
116 return interpolators, period
118 def _radiation_values_on_wall(self, wall: Wall):
119 """
120 Used to get some values you need for calculations with radiation.
122 See `self.get_radiation_on_wall_interpolator`
124 Parameters
125 ----------
126 wall : Wall
127 The `Wall` to calculate the incoming specific power on.
129 Returns
130 -------
131 stuff
132 """
133 seconds, values, period = self.get_interpolator_values()
134 direct_rad: np.ndarray = values[:, 7]
135 diffuse_rad: np.ndarray = values[:, 8]
136 atmospheric_rad: np.ndarray = values[:, 9]
137 terrestrial_rad: np.ndarray = -1 * values[:, 10] # change to positive
139 # parse the values on the wall (to recalculate all values for horizontal planes on the wall)
140 dates = np.array(np.datetime64(self.start_time.isoformat())) + seconds * np.timedelta64(1, "s")
141 direct_rad_values = wall.apply_direct_radiation(direct_rad, dates=dates)
142 diffuse_rad_values = wall.apply_sky_radiation(diffuse_rad)
143 atmospheric_rad_values = wall.apply_sky_radiation(atmospheric_rad)
144 terrestrial_rad_values = wall.apply_ground_radiation(terrestrial_rad)
146 return seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values
148 def radiation_interpolator(self, wall: Wall, wavelength_type="short") -> Callable:
149 """
150 Returns an interpolator that calculates the incoming specific power of short/long wavelengths on a wall in
151 W/m^2.
153 Parameters
154 ----------
155 wall : Wall
156 The wall to calculate the incoming specific power on.
157 wavelength_type : str | int | bool, optional
158 The type of wavelengths to use.
159 If "short", 0 or False, the interpolator for short wavelengths are returned, long otherwise.
161 Returns
162 -------
163 Callable :
164 The interpolator in the form ``f(time, *args, **kwargs)`` where only the time variable is actually used.
165 """
166 seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values = (
167 self._radiation_values_on_wall(wall)
168 )
170 if wavelength_type == "short" or wavelength_type == 0 or wavelength_type == False:
171 result_values = np.sum([direct_rad_values, diffuse_rad_values], axis=0)
172 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False)
173 return lambda time, *args, **kwargs: interpolator(time % self.period)
174 result_values = np.sum([atmospheric_rad_values, terrestrial_rad_values], axis=0)
175 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False)
176 return lambda time, *args, **kwargs: interpolator(time % self.period)
178 def get_radiation_on_wall_interpolator(self, wall: Wall) -> Callable:
179 """
180 Returns an interpolator that calculates the incoming specific power on a wall in W/m^2.
182 Parameters
183 ----------
184 wall : Wall
185 The wall to calculate the incoming specific power on.
187 Returns
188 -------
189 Callable :
190 The interpolator in the form ``f(time, *args, **kwargs)`` where only the time variable is actually used.
192 """
193 seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values = (
194 self._radiation_values_on_wall(wall)
195 )
197 # add all values up and create one Interpolator out of it
198 result_values = np.sum(
199 [direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values], axis=0
200 )
201 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False)
202 return lambda time, *args, **kwargs: interpolator(time % self.period)
204 def get_interpolator(self, name) -> Callable:
205 match name.lower():
206 case "t" | "temperature" | "temp" | "exterior" | "exterior_temperature" | "exterior temperature":
207 return lambda time, *args, **kwargs: self.interpolators[0](time % self.period)
208 case "direct solar" | "direct radiation" | "direct solar radiation" | "direct":
209 return lambda time, *args, **kwargs: self.interpolators[7](time % self.period)
210 case "diffuse solar" | "diffuse radiation" | "diffuse solar radiation" | "diffuse":
211 return lambda time, *args, **kwargs: self.interpolators[8](time % self.period)
212 case "atmospheric" | "atmospheric radiation":
213 return lambda time, *args, **kwargs: self.interpolators[9](time % self.period)
214 case "terrestrial" | "terrestrial radiation" | "ground radiation" | "ambient radiation":
215 return lambda time, *args, **kwargs: -1 * self.interpolators[10](time % self.period)
216 case _:
217 raise NotImplementedError # implement more if needed
219 def reorder_matrix_by_time(self):
220 matrix = self.get_raw_data()
221 times = matrix[:, self.datetime_col_index].astype(datetime)
223 times_norm = np.array([dt.replace(year=1970) for dt in times])
225 start_time_norm = self.start_time.replace(year=1970)
227 # Find the closest index
228 closest_idx = np.argmin(np.abs(times_norm - start_time_norm))
230 # Reorder the matrix
231 start_matrix = matrix[closest_idx:]
232 end_matrix = matrix[:closest_idx]
233 end_matrix[:, 0] = np.array([d.replace(year=d.year + 1) for d in end_matrix[:, 0]])
234 matrix = np.concatenate((start_matrix, end_matrix), axis=0)
235 time_seconds = np.array([(dt - matrix[0, 0]).total_seconds() for dt in matrix[:, 0]])
236 return time_seconds, matrix
239# weather_data = WeatherData(path=r"C:\path\to\dwd_data\TRY2015_523748130791_Jahr.dat",
240# start_time="2015-01-01 00:00:00")
242# if __name__ == "__main__":
243# path = os.path.normpath(r"C:\path\to\dwd_data\TRY2015_523748130791_Jahr.dat")
244# mat = load_data(path)
245# test_fun=prepare_interpolator(mat, columns=[1], start_time="2015-01-01T00:00")
246# labels = ["Temp"]
247#
248# x = list(range(0,8760*3600*2, 100))
249# y = np.array([test_fun(s) for s in x])
250#
251# for i in range(y.shape[1]):
252# plt.plot(np.array(x)/(3600*24),y[:,i], label=labels[i])
253# plt.show()
254#
255# print("done")