Coverage for pyrc \ postprocessing \ parse_scop_files.py: 9%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-13 16:59 +0200
1# -------------------------------------------------------------------------------
2# Copyright (C) 2026 Joel Kimmich, Tim Jourdan
3# ------------------------------------------------------------------------------
4# License
5# This file is part of PyRC, distributed under GPL-3.0-or-later.
6# ------------------------------------------------------------------------------
8import re
10import numpy as np
11import pandas as pd
12from datetime import datetime
15def parse_scop_file(filenames: dict) -> pd.DataFrame:
16 """
17 Parse SCOP data file and extract values for each VL and version/subversion.
19 Parameters
20 ----------
21 filenames : dict[str, str]
22 Names (keys) and paths (values) of the SCOP files.
24 Returns
25 -------
26 pd.DataFrame :
27 DataFrame with columns: period_type, date, subversion, VL, value, time_steps
28 """
30 result = []
31 for name, path in filenames.items():
32 with open(path, "r") as file:
33 content = file.read()
35 # Split by separator lines
36 blocks = content.split("--------------------------------------")
38 for block in blocks:
39 block = block.strip()
40 if not block:
41 continue
43 lines = block.split("\n")
45 # Parse header line
46 header_pattern = r"SCOP\$_\{neu\}\$/SCOP\$_\{alt\}\$ (\w+) (\d{4}-\d{2}-\d{2}) ([\w\s-]+) in %:"
47 header_match = re.match(header_pattern, lines[0])
49 if not header_match:
50 continue
52 period_type = header_match.group(1)
53 date = datetime.strptime(header_match.group(2), "%Y-%m-%d")
54 subversion = header_match.group(3)
56 orientation = "Süd"
57 volume_flow = 250
58 if "vf" in subversion:
59 volume_flow = int(subversion[2:].split()[0])
60 else:
61 orientation = subversion.split()[0]
63 # Parse VL values
64 vl_values = {}
65 heat_amount = None
66 for line in lines[1:-1]: # Skip header and time steps line
67 line = line.strip()
69 # check for VL values
70 vl_match = re.match(r"(\d+\.\d+) für VL = (\d+)", line)
71 if vl_match:
72 value = float(vl_match.group(1))
73 vl = int(vl_match.group(2))
74 vl_values[vl] = value
76 # Check for heat amount line
77 heat_amount_match = re.match(r"Wärmemenge / kWh/12/a: (.+)", line)
78 if heat_amount_match:
79 heat_amount_str = heat_amount_match.group(1)
80 heat_amount = np.array([float(x) for x in heat_amount_str.split()])
82 # Parse time steps
83 time_steps_match = re.search(r"for\s+(\d+)\s+time steps", lines[-1])
84 time_steps = int(time_steps_match.group(1)) if time_steps_match else None
86 # Add data for each VL
87 for vl, value in vl_values.items():
88 result.append(
89 {
90 "type": name,
91 "period_type": period_type,
92 "date": date,
93 "orientation": orientation,
94 "volume_flow": volume_flow,
95 "VL": vl,
96 "value": value,
97 "time_steps": time_steps,
98 "heat_amount": heat_amount,
99 }
100 )
102 return pd.DataFrame(result)
105def get_data_subset(df, period_type=None, orientation=None, volume_flow=None, vl=None, scop_type=None):
106 """
107 Filter DataFrame based on criteria.
109 Parameters
110 ----------
111 df : pd.DataFrame
112 Input DataFrame from parse_scop_file
113 period_type : str, optional
114 Filter by period type (day, week, year)
115 orientation : str, optional
116 Filter by orientation (Ost, Süd, ...)
117 volume_flow : int | list, optional
118 Filter by volume flow.
119 vl : int | list, optional
120 Filter by VL value(s)
121 scop_type : str | list, optional
122 The type of the SCOP values (e.g. preheat, complete, ...)
124 Returns
125 -------
126 pd.DataFrame
127 Filtered DataFrame
128 """
130 result = df.copy()
132 if period_type is not None:
133 result = result[result["period_type"] == period_type]
135 if orientation is not None:
136 result = result[result["orientation"] == orientation]
138 if volume_flow is not None:
139 if isinstance(volume_flow, (list, tuple)):
140 result = result[result["volume_flow"].isin(volume_flow)]
141 else:
142 result = result[result["volume_flow"] == volume_flow]
144 if scop_type is not None:
145 if isinstance(scop_type, (list, tuple)):
146 result = result[result["type"].isin(scop_type)]
147 else:
148 result = result[result["type"] == scop_type]
150 if vl is not None:
151 if isinstance(vl, (list, tuple)):
152 result = result[result["VL"].isin(vl)]
153 else:
154 result = result[result["VL"] == vl]
156 return result
159def add_heat_dates(df: pd.DataFrame):
160 """
161 For each array in column "heat_amount" it adds an array representing the time for each entry.
163 It also cleans the heat_amount arrays from unnecessary values (from overhanging time ranges and stuff).
165 Used before plotting the data.
167 Parameters
168 ----------
169 df
171 Returns
172 -------
173 pd.DataFrame :
174 The same DataFrame but with new column "heat_amount_time" and a cleaned up version of the column "heat_amount"
175 """
176 # only month and year are implemented because they are the important ones at first
177 df = df.copy()
179 def process_row(row):
180 if row["period_type"] == "month":
181 return np.array(row["date"]), row["heat_amount"]
182 elif row["period_type"] == "year":
183 heat_amount = row["heat_amount"][:12] if row["heat_amount"] is not None else None
184 year = row["date"].year
185 heat_dates = np.array([pd.Timestamp(year, month, 1) for month in range(1, 13)])
186 return heat_dates, heat_amount
187 else:
188 return row["date"], row["heat_amount"]
190 df[["heat_dates", "heat_amount"]] = df.apply(
191 lambda row: pd.Series(process_row(row)), axis=1
192 )
194 return df