Skip to content

Commit a5e45ab

Browse files
Add refactored data processing module to reporting
Signed-off-by: Flora <[email protected]>
1 parent 8706709 commit a5e45ab

File tree

2 files changed

+363
-0
lines changed

2 files changed

+363
-0
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
## New Features
1212

1313
- Added consistent logger setup across all modules for structured logging and improved observability. Example notebooks updated to demonstrate logger usage.
14+
- Added a `data_processing` module to `reporting` that provides a set of functions for processing, enriching, and analyzing time-series energy data from microgrid systems.
1415

1516
## Bug Fixes
1617

Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Data processing utilities for microgrid energy reporting.
5+
6+
This module provides a set of functions for processing, enriching, and analyzing
7+
time-series energy data from microgrid systems. It focuses on preparing data for
8+
PV (photovoltaic), battery, and grid energy flows, transforming it into a consistent
9+
structure for visualization, reporting, and analysis.
10+
11+
Features
12+
--------
13+
- Enriches raw energy data with derived columns such as:
14+
- PV production, self-consumption, feed-in, and battery charging.
15+
- Net grid import and PV self-consumption share.
16+
- Handles time zone localization and conversion to Europe/Berlin.
17+
- Dynamically renames columns to more descriptive names, including
18+
mapping component IDs (e.g., "PV #1", "Batterie #2").
19+
- Provides summary energy mix breakdowns (PV vs grid) in kWh, % share, and average kW.
20+
- Prepares tailored DataFrames for PV and battery analysis, supporting flexible
21+
filtering by component.
22+
23+
Main Functions
24+
--------------
25+
- `transform_energy_dataframe(df, component_types, mcfg)`:
26+
Transforms a raw DataFrame with energy metrics into an enriched,
27+
user-friendly format, adding PV, battery, and grid metrics.
28+
29+
- `compute_power_df(main_df, resolution)`:
30+
Computes total energy drawn from PV and grid sources over the given resolution,
31+
returning a summary DataFrame with kWh, percentage, and average kW.
32+
33+
- `print_pv_sums(main_df, resolution)`:
34+
Prints total PV feed-in sums for each individual PV component
35+
in a localized numeric format.
36+
37+
- `create_pv_analysis_df(main_df, pv_filter, pvgrid_filter, pv_grid_filter_options)`:
38+
Generates a DataFrame for PV analysis based on selected PV components
39+
and whether to analyze PV alone, grid alone, or a grid/PV split.
40+
41+
- `create_battery_analysis_df(main_df, bat_filter)`:
42+
Creates a DataFrame for analyzing battery throughput, reshaping
43+
it to long format for multi-battery analysis.
44+
45+
Usage
46+
-----
47+
Typical usage involves:
48+
1. Loading a raw DataFrame with time-indexed energy measurements.
49+
2. Calling `transform_energy_dataframe` to process and enrich it.
50+
3. Using the resulting DataFrames to generate summaries,
51+
for example with `compute_power_df`, `create_pv_analysis_df`, or
52+
`create_battery_analysis_df` for visualization.
53+
"""
54+
55+
from typing import Any, Dict, Iterable, List, Tuple, Union
56+
57+
import pandas as pd
58+
59+
# Constants
60+
TZ_NAME = "Europe/Berlin"
61+
COLUMN_TIMESTAMP = "timestamp"
62+
COLUMN_TIMESTAMP_NAMED = "Zeitpunkt"
63+
COLUMN_GRID = "grid"
64+
COLUMN_GRID_NAMED = "Netzanschluss"
65+
COLUMN_NET_IMPORT = "Netzbezug"
66+
COLUMN_CONSUMPTION = "consumption"
67+
COLUMN_CONSUMPTION_NAMED = "Brutto Gesamtverbrauch"
68+
COLUMN_BATTERY = "battery"
69+
COLUMN_BATTERY_POS = "battery_pos"
70+
COLUMN_BATTERY_NAMED = "Batterie Durchsatz"
71+
COLUMN_PV = "pv"
72+
COLUMN_PV_PROD = "PV Produktion"
73+
COLUMN_PV_NEG = "pv_neg"
74+
COLUMN_PV_EXCESS = "pv_excess"
75+
COLUMN_PV_FEEDIN = "PV Einspeisung"
76+
COLUMN_PV_SELF = "PV Eigenverbrauch"
77+
COLUMN_PV_BAT = "pv_bat"
78+
COLUMN_PV_IN_BAT = "PV in Batterie"
79+
COLUMN_PV_SHARE = "PV Eigenverbrauchsanteil"
80+
COLUMN_PV_THROUGHPUT = "PV Durchsatz"
81+
82+
83+
def transform_energy_dataframe(
84+
df: pd.DataFrame,
85+
component_types: List[str],
86+
mcfg: Any,
87+
) -> Tuple[pd.DataFrame, pd.DataFrame]:
88+
"""Transform and enrich energy dataframe.
89+
90+
This function processes a raw DataFrame containing energy metrics,
91+
adding derived columns for PV production, battery throughput, and grid metrics.
92+
93+
Args:
94+
df: Raw DataFrame with energy metrics, expected to have a datetime index.
95+
component_types: List of component types present in the DataFrame (e.g., ["pv", "battery"]).
96+
mcfg: Microgrid configuration object providing component type IDs and other metadata.
97+
98+
Returns:
99+
A tuple containing:
100+
- `main_df`: A DataFrame with main columns for visualization and reporting.
101+
- `df_renamed`: A fully enriched DataFrame with all derived columns and renamed
102+
component IDs.
103+
"""
104+
# Ensure the DataFrame has a datetime index
105+
df = df.reset_index()
106+
107+
# Enrich with PV-related columns
108+
if "pv" in component_types:
109+
df[COLUMN_PV_PROD] = -df.get(COLUMN_PV_NEG, 0)
110+
df[COLUMN_PV_EXCESS] = (df[COLUMN_PV_PROD] - df[COLUMN_CONSUMPTION]).clip(
111+
lower=0
112+
)
113+
114+
if "battery" in component_types:
115+
df[COLUMN_PV_IN_BAT] = df[[COLUMN_PV_EXCESS, COLUMN_BATTERY_POS]].min(
116+
axis=1
117+
)
118+
else:
119+
df[COLUMN_PV_IN_BAT] = 0
120+
121+
df[COLUMN_PV_FEEDIN] = df[COLUMN_PV_EXCESS] - df[COLUMN_PV_IN_BAT]
122+
df[COLUMN_PV_SELF] = (df[COLUMN_PV_PROD] - df[COLUMN_PV_EXCESS]).clip(lower=0)
123+
df[COLUMN_PV_SHARE] = df[COLUMN_PV_SELF] / df[COLUMN_CONSUMPTION].replace(
124+
0, pd.NA
125+
)
126+
127+
# Convert timestamp to Berlin time
128+
if df[COLUMN_TIMESTAMP].dt.tz is None:
129+
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_localize("UTC")
130+
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_convert(TZ_NAME)
131+
132+
# Basic renaming
133+
rename_map: Dict[str, str] = {
134+
COLUMN_TIMESTAMP: COLUMN_TIMESTAMP_NAMED,
135+
COLUMN_GRID: COLUMN_GRID_NAMED,
136+
COLUMN_CONSUMPTION: COLUMN_CONSUMPTION_NAMED,
137+
}
138+
139+
if "battery" in component_types:
140+
rename_map[COLUMN_BATTERY] = COLUMN_BATTERY_NAMED
141+
142+
if "pv" in component_types:
143+
rename_map.update(
144+
{
145+
"pv": COLUMN_PV_THROUGHPUT,
146+
COLUMN_PV_PROD: COLUMN_PV_PROD,
147+
COLUMN_PV_SELF: COLUMN_PV_SELF,
148+
COLUMN_PV_FEEDIN: COLUMN_PV_FEEDIN,
149+
COLUMN_PV_SHARE: COLUMN_PV_SHARE,
150+
}
151+
)
152+
if "battery" in component_types:
153+
rename_map[COLUMN_PV_BAT] = COLUMN_PV_IN_BAT
154+
155+
# Rename individual component IDs
156+
single_comp = [col for col in df.columns if col.isdigit()]
157+
158+
if "battery" in component_types:
159+
battery_ids = {
160+
str(i) for i in mcfg.component_type_ids(component_type="battery")
161+
}
162+
rename_map.update(
163+
{col: f"Batterie #{col}" for col in single_comp if col in battery_ids}
164+
)
165+
166+
if "pv" in component_types:
167+
pv_ids = {str(i) for i in mcfg.component_type_ids(component_type="pv")}
168+
rename_map.update({col: f"PV #{col}" for col in single_comp if col in pv_ids})
169+
170+
df_renamed = df.rename(columns=rename_map)
171+
172+
# Add derived net import column
173+
df_renamed[COLUMN_NET_IMPORT] = df_renamed[COLUMN_GRID_NAMED].clip(lower=0)
174+
175+
# Select main columns for compact display
176+
def _get_main_columns(
177+
columns: Iterable[str], component_types: List[str]
178+
) -> List[str]:
179+
base = {
180+
COLUMN_TIMESTAMP_NAMED,
181+
COLUMN_GRID_NAMED,
182+
COLUMN_NET_IMPORT,
183+
COLUMN_CONSUMPTION_NAMED,
184+
}
185+
186+
if "battery" in component_types:
187+
base.add(COLUMN_BATTERY_NAMED)
188+
189+
if "pv" in component_types:
190+
base.update(
191+
{
192+
COLUMN_PV_THROUGHPUT,
193+
COLUMN_PV_PROD,
194+
COLUMN_PV_SELF,
195+
COLUMN_PV_FEEDIN,
196+
}
197+
)
198+
if "battery" in component_types:
199+
base.update({COLUMN_PV_IN_BAT, COLUMN_PV_SHARE})
200+
201+
# Add individual component columns like "PV #1", "Batterie #3", etc.
202+
base.update({col for col in columns if "#" in col})
203+
204+
return [col for col in columns if col in base]
205+
206+
main_df = df_renamed[_get_main_columns(df_renamed.columns, component_types)]
207+
208+
return main_df, df_renamed
209+
210+
211+
def compute_power_df(
212+
main_df: pd.DataFrame, resolution: Union[str, pd.Timedelta]
213+
) -> pd.DataFrame:
214+
"""Compute energy mix (PV vs grid) and return a summary power DataFrame.
215+
216+
Args:
217+
main_df: DataFrame with energy data, including 'Netzbezug'
218+
and optionally 'PV Eigenverbrauch'.
219+
resolution: Time resolution of each row in the DataFrame (e.g., "15min").
220+
221+
Returns:
222+
A DataFrame summarizing the energy source mix in kWh, %, and average kW.
223+
"""
224+
resolution = pd.to_timedelta(resolution)
225+
hours = resolution.total_seconds() / 3600
226+
227+
# Calculate energy from grid
228+
grid_kwh = round(main_df[COLUMN_NET_IMPORT].sum() * hours, 2)
229+
230+
if COLUMN_PV_SELF in main_df.columns:
231+
# Calculate energy from PV
232+
pv_self_kwh = round(main_df[COLUMN_PV_SELF].sum() * hours, 2)
233+
total_kwh = pv_self_kwh + grid_kwh
234+
235+
energy_kwh = [pv_self_kwh, grid_kwh]
236+
energy_labels = ["PV", "Netz"]
237+
238+
return pd.DataFrame(
239+
{
240+
"Energiebezug": energy_labels,
241+
"Energie [kWh]": energy_kwh,
242+
"Energie %": [round(e / total_kwh * 100, 2) for e in energy_kwh],
243+
"Energie [kW]": [round(e / hours, 2) for e in energy_kwh],
244+
}
245+
)
246+
247+
# Only grid consumption available
248+
return pd.DataFrame(
249+
{
250+
"Energiebezug": ["Netz"],
251+
"Energie [kWh]": [grid_kwh],
252+
"Energie %": [100.0],
253+
"Energie [kW]": [round(grid_kwh / hours, 2)],
254+
}
255+
)
256+
257+
258+
def print_pv_sums(main_df: pd.DataFrame, resolution: pd.Timedelta) -> None:
259+
"""Print formatted sums for each PV column.
260+
261+
Args:
262+
main_df: DataFrame containing PV columns with energy data.
263+
resolution: Time resolution of each row in the DataFrame (e.g., "15min").
264+
"""
265+
pv_columns = [col for col in main_df.columns.tolist() if "PV #" in col]
266+
267+
for pv in pv_columns:
268+
pv_sum = round(main_df[pv].sum() * resolution * -1, 2)
269+
formatted_sum = (
270+
f"{pv_sum:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
271+
)
272+
print(f"{pv:<7}: {formatted_sum} kWh")
273+
274+
275+
def create_pv_analysis_df(
276+
main_df: pd.DataFrame,
277+
pv_filter: List[str],
278+
pvgrid_filter: str,
279+
pv_grid_filter_options: List[str],
280+
) -> pd.DataFrame:
281+
"""Create a DataFrame for PV analysis based on selected filters.
282+
283+
Args:
284+
main_df: DataFrame containing PV and grid data.
285+
pv_filter: List of PV components to include (e.g., ["1", "2"] or ["Alle"]).
286+
pvgrid_filter: Filter option for PV and grid analysis (e.g., "PV", "Grid", "PV + Grid").
287+
pv_grid_filter_options: List of available filter options for PV and grid.
288+
Returns:
289+
A DataFrame with PV feed-in data, reshaped for analysis.
290+
"""
291+
# Case 1: Only PV
292+
if pvgrid_filter == pv_grid_filter_options[1]:
293+
pv_columns = (
294+
[col for col in main_df.columns if "PV #" in col]
295+
if "Alle" in pv_filter
296+
else [f"PV {pv}" for pv in pv_filter]
297+
)
298+
df = main_df[[COLUMN_TIMESTAMP_NAMED] + pv_columns].copy()
299+
df = df.melt(
300+
id_vars=[COLUMN_TIMESTAMP_NAMED],
301+
value_vars=pv_columns,
302+
var_name="PV",
303+
value_name=COLUMN_PV_FEEDIN,
304+
)
305+
df[COLUMN_PV_FEEDIN] *= -1
306+
df["PV"] = df["PV"].str[3:]
307+
308+
# Case 2: Only Grid
309+
elif pvgrid_filter == pv_grid_filter_options[2]:
310+
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED]].copy()
311+
df["PV"] = "#"
312+
313+
# Case 3: Grid + PV split
314+
else:
315+
pv_columns = (
316+
[col for col in main_df.columns if "PV #" in col]
317+
if "Alle" in pv_filter
318+
else [f"PV {pv}" for pv in pv_filter]
319+
)
320+
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED] + pv_columns].copy()
321+
df = df.melt(
322+
id_vars=[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED],
323+
value_vars=pv_columns,
324+
var_name="PV",
325+
value_name=COLUMN_PV_FEEDIN,
326+
)
327+
df[COLUMN_GRID_NAMED] /= len(pv_columns)
328+
df[COLUMN_PV_FEEDIN] *= -1
329+
df["PV"] = df["PV"].str[3:]
330+
331+
return df
332+
333+
334+
def create_battery_analysis_df(
335+
main_df: pd.DataFrame, bat_filter: List[str]
336+
) -> pd.DataFrame:
337+
"""Create a DataFrame for battery analysis based on selected filters.
338+
339+
Args:
340+
main_df: DataFrame containing battery data.
341+
bat_filter: List of battery components to include (e.g., ["1", "2"] or ["Alle"]).
342+
Returns:
343+
A DataFrame with battery throughput data, reshaped for analysis.
344+
"""
345+
bat_columns = (
346+
[col for col in main_df.columns if "Batterie #" in col]
347+
if "Alle" in bat_filter
348+
else [f"Batterie {i}" for i in bat_filter]
349+
)
350+
351+
df = main_df[bat_columns].copy()
352+
df[COLUMN_TIMESTAMP_NAMED] = main_df.index
353+
354+
df = df.melt(
355+
id_vars=[COLUMN_TIMESTAMP_NAMED],
356+
value_vars=bat_columns,
357+
var_name="Batterie",
358+
value_name=COLUMN_BATTERY_NAMED,
359+
)
360+
df["Batterie"] = df["Batterie"].str[9:]
361+
362+
return df

0 commit comments

Comments
 (0)