Coverage for pyrc \ dataHandler \ weather.py: 29%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8import os.path 

9from collections.abc import Callable 

10from datetime import datetime 

11 

12import numpy as np 

13import pandas as pd 

14from scipy.interpolate import PchipInterpolator 

15 

16from pyrc.core.wall import Wall 

17 

18 

19class WeatherData: 

20 def __init__( 

21 self, 

22 path, 

23 start_time: str | datetime = "2015-01-01T01:00", 

24 ): 

25 self.path = os.path.normpath(path) 

26 if not isinstance(start_time, datetime): 

27 start_time = datetime.fromisoformat(start_time) 

28 self.start_time: datetime = start_time 

29 self.datetime_col_index = 0 

30 self.edge_points = 5 

31 self.columns = [ 

32 "temp", # 0 

33 "pressure", # 1 

34 "wind direction", # 2 

35 "wind speed", # 3 

36 "cloudiness", # 4 

37 "moisture", # 5 

38 "relative_humidity", # 6 

39 "direct solar radiation", # 7 

40 "diffuse solar radiation", # 8 

41 "atmospheric heat radiation", # 9 

42 "terrestrial heat radiation", # 10 

43 ] 

44 

45 self.__period = None 

46 self.__interpolators = None 

47 

48 def __copy__(self): 

49 return WeatherData( 

50 path=self.path, 

51 start_time=datetime.fromisoformat(self.start_time.isoformat()), 

52 ) 

53 

54 def __deepcopy__(self): 

55 return self.__copy__() 

56 

57 @property 

58 def period(self): 

59 if self.__period is None: 

60 _, self.__period = self._make_interpolators() 

61 return self.__period 

62 

63 @property 

64 def interpolators(self) -> list[PchipInterpolator]: 

65 if self.__interpolators is None: 

66 self.__interpolators, self.__period = self._make_interpolators() 

67 return self.__interpolators 

68 

69 def get_raw_data(self) -> np.ndarray: 

70 df = pd.read_csv( 

71 self.path, 

72 skiprows=34, 

73 sep="\s+", 

74 header=None, 

75 names=["RW", "HW", "MM", "DD", "HH", "T", "P", "WR", "WG", "N", "x", "RF", "B", "D", "A", "E", "IL"], 

76 ) 

77 

78 df["datetime"] = pd.to_datetime(dict(year=2015, month=df["MM"], day=df["DD"], hour=df["HH"])) 

79 df.drop(columns=["RW", "HW", "IL", "MM", "DD", "HH"], inplace=True) 

80 

81 df["T"] += 273.15 # °C to K 

82 df["P"] *= 100 # hPa to Pa 

83 df["N"] /= 8.0 # 1/8 to 0–1 

84 df["x"] /= 1000.0 # g/kg to kg/kg 

85 

86 cols = ["datetime"] + [c for c in df.columns if c != "datetime"] 

87 df = df[cols] 

88 

89 result = df.to_numpy() 

90 result[:, 0] = result[:, 0].astype("datetime64[s]") 

91 

92 return result 

93 

94 def get_interpolator_values(self) -> tuple: 

95 

96 # rearrange matrix so that it starts at start_time 

97 seconds, data = self.reorder_matrix_by_time() 

98 columns = np.arange(1, data.shape[1]) # skip datetime column by default 

99 values = data[:, columns] 

100 

101 # edge padding using correct slices 

102 period = seconds[-1] - seconds[0] + (seconds[1] - seconds[0]) 

103 left = np.arange(-self.edge_points, 0) 

104 right = np.arange(0, self.edge_points) 

105 extended_seconds = np.concatenate([seconds[left] - period, seconds, seconds[right] + period]) 

106 extended_values = np.concatenate([values[left], values, values[right]]) 

107 return extended_seconds, extended_values, period 

108 

109 def _make_interpolators(self): 

110 extended_seconds, extended_values, period = self.get_interpolator_values() 

111 

112 interpolators = [ 

113 PchipInterpolator(extended_seconds, extended_values[:, i], extrapolate=False) 

114 for i in range(extended_values.shape[1]) 

115 ] 

116 return interpolators, period 

117 

118 def _radiation_values_on_wall(self, wall: Wall): 

119 """ 

120 Used to get some values you need for calculations with radiation. 

121 

122 See `self.get_radiation_on_wall_interpolator` 

123 

124 Parameters 

125 ---------- 

126 wall : Wall 

127 The `Wall` to calculate the incoming specific power on. 

128 

129 Returns 

130 ------- 

131 stuff 

132 """ 

133 seconds, values, period = self.get_interpolator_values() 

134 direct_rad: np.ndarray = values[:, 7] 

135 diffuse_rad: np.ndarray = values[:, 8] 

136 atmospheric_rad: np.ndarray = values[:, 9] 

137 terrestrial_rad: np.ndarray = -1 * values[:, 10] # change to positive 

138 

139 # parse the values on the wall (to recalculate all values for horizontal planes on the wall) 

140 dates = np.array(np.datetime64(self.start_time.isoformat())) + seconds * np.timedelta64(1, "s") 

141 direct_rad_values = wall.apply_direct_radiation(direct_rad, dates=dates) 

142 diffuse_rad_values = wall.apply_sky_radiation(diffuse_rad) 

143 atmospheric_rad_values = wall.apply_sky_radiation(atmospheric_rad) 

144 terrestrial_rad_values = wall.apply_ground_radiation(terrestrial_rad) 

145 

146 return seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values 

147 

148 def radiation_interpolator(self, wall: Wall, wavelength_type="short") -> Callable: 

149 """ 

150 Returns an interpolator that calculates the incoming specific power of short/long wavelengths on a wall in 

151 W/m^2. 

152 

153 Parameters 

154 ---------- 

155 wall : Wall 

156 The wall to calculate the incoming specific power on. 

157 wavelength_type : str | int | bool, optional 

158 The type of wavelengths to use. 

159 If "short", 0 or False, the interpolator for short wavelengths are returned, long otherwise. 

160 

161 Returns 

162 ------- 

163 Callable : 

164 The interpolator in the form ``f(time, *args, **kwargs)`` where only the time variable is actually used. 

165 """ 

166 seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values = ( 

167 self._radiation_values_on_wall(wall) 

168 ) 

169 

170 if wavelength_type == "short" or wavelength_type == 0 or wavelength_type == False: 

171 result_values = np.sum([direct_rad_values, diffuse_rad_values], axis=0) 

172 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False) 

173 return lambda time, *args, **kwargs: interpolator(time % self.period) 

174 result_values = np.sum([atmospheric_rad_values, terrestrial_rad_values], axis=0) 

175 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False) 

176 return lambda time, *args, **kwargs: interpolator(time % self.period) 

177 

178 def get_radiation_on_wall_interpolator(self, wall: Wall) -> Callable: 

179 """ 

180 Returns an interpolator that calculates the incoming specific power on a wall in W/m^2. 

181 

182 Parameters 

183 ---------- 

184 wall : Wall 

185 The wall to calculate the incoming specific power on. 

186 

187 Returns 

188 ------- 

189 Callable : 

190 The interpolator in the form ``f(time, *args, **kwargs)`` where only the time variable is actually used. 

191 

192 """ 

193 seconds, direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values = ( 

194 self._radiation_values_on_wall(wall) 

195 ) 

196 

197 # add all values up and create one Interpolator out of it 

198 result_values = np.sum( 

199 [direct_rad_values, diffuse_rad_values, atmospheric_rad_values, terrestrial_rad_values], axis=0 

200 ) 

201 interpolator = PchipInterpolator(seconds, result_values, extrapolate=False) 

202 return lambda time, *args, **kwargs: interpolator(time % self.period) 

203 

204 def get_interpolator(self, name) -> Callable: 

205 match name.lower(): 

206 case "t" | "temperature" | "temp" | "exterior" | "exterior_temperature" | "exterior temperature": 

207 return lambda time, *args, **kwargs: self.interpolators[0](time % self.period) 

208 case "direct solar" | "direct radiation" | "direct solar radiation" | "direct": 

209 return lambda time, *args, **kwargs: self.interpolators[7](time % self.period) 

210 case "diffuse solar" | "diffuse radiation" | "diffuse solar radiation" | "diffuse": 

211 return lambda time, *args, **kwargs: self.interpolators[8](time % self.period) 

212 case "atmospheric" | "atmospheric radiation": 

213 return lambda time, *args, **kwargs: self.interpolators[9](time % self.period) 

214 case "terrestrial" | "terrestrial radiation" | "ground radiation" | "ambient radiation": 

215 return lambda time, *args, **kwargs: -1 * self.interpolators[10](time % self.period) 

216 case _: 

217 raise NotImplementedError # implement more if needed 

218 

219 def reorder_matrix_by_time(self): 

220 matrix = self.get_raw_data() 

221 times = matrix[:, self.datetime_col_index].astype(datetime) 

222 

223 times_norm = np.array([dt.replace(year=1970) for dt in times]) 

224 

225 start_time_norm = self.start_time.replace(year=1970) 

226 

227 # Find the closest index 

228 closest_idx = np.argmin(np.abs(times_norm - start_time_norm)) 

229 

230 # Reorder the matrix 

231 start_matrix = matrix[closest_idx:] 

232 end_matrix = matrix[:closest_idx] 

233 end_matrix[:, 0] = np.array([d.replace(year=d.year + 1) for d in end_matrix[:, 0]]) 

234 matrix = np.concatenate((start_matrix, end_matrix), axis=0) 

235 time_seconds = np.array([(dt - matrix[0, 0]).total_seconds() for dt in matrix[:, 0]]) 

236 return time_seconds, matrix 

237 

238 

239# weather_data = WeatherData(path=r"C:\path\to\dwd_data\TRY2015_523748130791_Jahr.dat", 

240# start_time="2015-01-01 00:00:00") 

241 

242# if __name__ == "__main__": 

243# path = os.path.normpath(r"C:\path\to\dwd_data\TRY2015_523748130791_Jahr.dat") 

244# mat = load_data(path) 

245# test_fun=prepare_interpolator(mat, columns=[1], start_time="2015-01-01T00:00") 

246# labels = ["Temp"] 

247# 

248# x = list(range(0,8760*3600*2, 100)) 

249# y = np.array([test_fun(s) for s in x]) 

250# 

251# for i in range(y.shape[1]): 

252# plt.plot(np.array(x)/(3600*24),y[:,i], label=labels[i]) 

253# plt.show() 

254# 

255# print("done")