Coverage for pyrc \ postprocessing \ parse_scop_files.py: 9%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-13 16:59 +0200

1# ------------------------------------------------------------------------------- 

2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan 

3# ------------------------------------------------------------------------------ 

4# License 

5# This file is part of PyRC, distributed under GPL-3.0-or-later. 

6# ------------------------------------------------------------------------------ 

7 

8import re 

9 

10import numpy as np 

11import pandas as pd 

12from datetime import datetime 

13 

14 

15def parse_scop_file(filenames: dict) -> pd.DataFrame: 

16 """ 

17 Parse SCOP data file and extract values for each VL and version/subversion. 

18 

19 Parameters 

20 ---------- 

21 filenames : dict[str, str] 

22 Names (keys) and paths (values) of the SCOP files. 

23 

24 Returns 

25 ------- 

26 pd.DataFrame : 

27 DataFrame with columns: period_type, date, subversion, VL, value, time_steps 

28 """ 

29 

30 result = [] 

31 for name, path in filenames.items(): 

32 with open(path, "r") as file: 

33 content = file.read() 

34 

35 # Split by separator lines 

36 blocks = content.split("--------------------------------------") 

37 

38 for block in blocks: 

39 block = block.strip() 

40 if not block: 

41 continue 

42 

43 lines = block.split("\n") 

44 

45 # Parse header line 

46 header_pattern = r"SCOP\$_\{neu\}\$/SCOP\$_\{alt\}\$ (\w+) (\d{4}-\d{2}-\d{2}) ([\w\s-]+) in %:" 

47 header_match = re.match(header_pattern, lines[0]) 

48 

49 if not header_match: 

50 continue 

51 

52 period_type = header_match.group(1) 

53 date = datetime.strptime(header_match.group(2), "%Y-%m-%d") 

54 subversion = header_match.group(3) 

55 

56 orientation = "Süd" 

57 volume_flow = 250 

58 if "vf" in subversion: 

59 volume_flow = int(subversion[2:].split()[0]) 

60 else: 

61 orientation = subversion.split()[0] 

62 

63 # Parse VL values 

64 vl_values = {} 

65 heat_amount = None 

66 for line in lines[1:-1]: # Skip header and time steps line 

67 line = line.strip() 

68 

69 # check for VL values 

70 vl_match = re.match(r"(\d+\.\d+) für VL = (\d+)", line) 

71 if vl_match: 

72 value = float(vl_match.group(1)) 

73 vl = int(vl_match.group(2)) 

74 vl_values[vl] = value 

75 

76 # Check for heat amount line 

77 heat_amount_match = re.match(r"Wärmemenge / kWh/12/a: (.+)", line) 

78 if heat_amount_match: 

79 heat_amount_str = heat_amount_match.group(1) 

80 heat_amount = np.array([float(x) for x in heat_amount_str.split()]) 

81 

82 # Parse time steps 

83 time_steps_match = re.search(r"for\s+(\d+)\s+time steps", lines[-1]) 

84 time_steps = int(time_steps_match.group(1)) if time_steps_match else None 

85 

86 # Add data for each VL 

87 for vl, value in vl_values.items(): 

88 result.append( 

89 { 

90 "type": name, 

91 "period_type": period_type, 

92 "date": date, 

93 "orientation": orientation, 

94 "volume_flow": volume_flow, 

95 "VL": vl, 

96 "value": value, 

97 "time_steps": time_steps, 

98 "heat_amount": heat_amount, 

99 } 

100 ) 

101 

102 return pd.DataFrame(result) 

103 

104 

105def get_data_subset(df, period_type=None, orientation=None, volume_flow=None, vl=None, scop_type=None): 

106 """ 

107 Filter DataFrame based on criteria. 

108 

109 Parameters 

110 ---------- 

111 df : pd.DataFrame 

112 Input DataFrame from parse_scop_file 

113 period_type : str, optional 

114 Filter by period type (day, week, year) 

115 orientation : str, optional 

116 Filter by orientation (Ost, Süd, ...) 

117 volume_flow : int | list, optional 

118 Filter by volume flow. 

119 vl : int | list, optional 

120 Filter by VL value(s) 

121 scop_type : str | list, optional 

122 The type of the SCOP values (e.g. preheat, complete, ...) 

123 

124 Returns 

125 ------- 

126 pd.DataFrame 

127 Filtered DataFrame 

128 """ 

129 

130 result = df.copy() 

131 

132 if period_type is not None: 

133 result = result[result["period_type"] == period_type] 

134 

135 if orientation is not None: 

136 result = result[result["orientation"] == orientation] 

137 

138 if volume_flow is not None: 

139 if isinstance(volume_flow, (list, tuple)): 

140 result = result[result["volume_flow"].isin(volume_flow)] 

141 else: 

142 result = result[result["volume_flow"] == volume_flow] 

143 

144 if scop_type is not None: 

145 if isinstance(scop_type, (list, tuple)): 

146 result = result[result["type"].isin(scop_type)] 

147 else: 

148 result = result[result["type"] == scop_type] 

149 

150 if vl is not None: 

151 if isinstance(vl, (list, tuple)): 

152 result = result[result["VL"].isin(vl)] 

153 else: 

154 result = result[result["VL"] == vl] 

155 

156 return result 

157 

158 

159def add_heat_dates(df: pd.DataFrame): 

160 """ 

161 For each array in column "heat_amount" it adds an array representing the time for each entry. 

162 

163 It also cleans the heat_amount arrays from unnecessary values (from overhanging time ranges and stuff). 

164 

165 Used before plotting the data. 

166 

167 Parameters 

168 ---------- 

169 df 

170 

171 Returns 

172 ------- 

173 pd.DataFrame : 

174 The same DataFrame but with new column "heat_amount_time" and a cleaned up version of the column "heat_amount" 

175 """ 

176 # only month and year are implemented because they are the important ones at first 

177 df = df.copy() 

178 

179 def process_row(row): 

180 if row["period_type"] == "month": 

181 return np.array(row["date"]), row["heat_amount"] 

182 elif row["period_type"] == "year": 

183 heat_amount = row["heat_amount"][:12] if row["heat_amount"] is not None else None 

184 year = row["date"].year 

185 heat_dates = np.array([pd.Timestamp(year, month, 1) for month in range(1, 13)]) 

186 return heat_dates, heat_amount 

187 else: 

188 return row["date"], row["heat_amount"] 

189 

190 df[["heat_dates", "heat_amount"]] = df.apply( 

191 lambda row: pd.Series(process_row(row)), axis=1 

192 ) 

193 

194 return df