Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Added consistent logger setup across all modules for structured logging and improved observability. Example notebooks updated to demonstrate logger usage.
- The signature for passing config files MicrogridConfig.load_config() has been changed to accept a path a list of paths and a directory containing the config files.
- `MicrogridData` class needs to be initialized with a `MicrogridConfig` object instead of a path to config file(s).
- Added a `data_processing` module to `reporting` that provides a set of functions for processing, enriching, and analyzing time-series energy data from microgrid systems.

## Bug Fixes

Expand Down
361 changes: 361 additions & 0 deletions src/frequenz/lib/notebooks/reporting/data_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Data processing utilities for microgrid energy reporting.

This module provides a set of functions for processing, enriching, and analyzing
time-series energy data from microgrid systems. It focuses on preparing data for
PV (photovoltaic), battery, and grid energy flows, transforming it into a consistent
structure for visualization, reporting, and analysis.

Features
--------
- Enriches raw energy data with derived columns such as:
- PV production, self-consumption, feed-in, and battery charging.
- Net grid import and PV self-consumption share.
- Handles time zone localization and conversion to Europe/Berlin.
- Dynamically renames columns to more descriptive names, including
mapping component IDs (e.g., "PV #1", "Batterie #2").
- Provides summary energy mix breakdowns (PV vs grid) in kWh, % share, and average kW.
- Prepares tailored DataFrames for PV and battery analysis, supporting flexible
filtering by component.

Main Functions
--------------
- `transform_energy_dataframe(df, component_types, mcfg)`:
Transforms a raw DataFrame with energy metrics into an enriched,
user-friendly format, adding PV, battery, and grid metrics.

- `compute_power_df(main_df, resolution)`:
Computes total energy drawn from PV and grid sources over the given resolution,
returning a summary DataFrame with kWh, percentage, and average kW.

- `print_pv_sums(main_df, resolution)`:
Prints total PV feed-in sums for each individual PV component
in a localized numeric format.

- `create_pv_analysis_df(main_df, pv_filter, pvgrid_filter, pv_grid_filter_options)`:
Generates a DataFrame for PV analysis based on selected PV components
and whether to analyze PV alone, grid alone, or a grid/PV split.

- `create_battery_analysis_df(main_df, bat_filter)`:
Creates a DataFrame for analyzing battery throughput, reshaping
it to long format for multi-battery analysis.

Usage
-----
Typical usage involves:
1. Loading a raw DataFrame with time-indexed energy measurements.
2. Calling `transform_energy_dataframe` to process and enrich it.
3. Using the resulting DataFrames to generate summaries,
for example with `compute_power_df`, `create_pv_analysis_df`, or
`create_battery_analysis_df` for visualization.
"""

from typing import Any, Dict, Iterable, List, Tuple, Union

import pandas as pd

# Constants
TZ_NAME = "Europe/Berlin"
COLUMN_TIMESTAMP = "timestamp"
COLUMN_TIMESTAMP_NAMED = "Zeitpunkt"
COLUMN_GRID = "grid"
COLUMN_GRID_NAMED = "Netzanschluss"
COLUMN_NET_IMPORT = "Netzbezug"
COLUMN_CONSUMPTION = "consumption"
COLUMN_CONSUMPTION_NAMED = "Brutto Gesamtverbrauch"
COLUMN_BATTERY = "battery"
COLUMN_BATTERY_POS = "battery_pos"
COLUMN_BATTERY_NAMED = "Batterie Durchsatz"
COLUMN_PV = "pv"
COLUMN_PV_PROD = "PV Produktion"
COLUMN_PV_NEG = "pv_neg"
COLUMN_PV_EXCESS = "pv_excess"
COLUMN_PV_FEEDIN = "PV Einspeisung"
COLUMN_PV_SELF = "PV Eigenverbrauch"
COLUMN_PV_BAT = "pv_bat"
COLUMN_PV_IN_BAT = "PV in Batterie"
COLUMN_PV_SHARE = "PV Eigenverbrauchsanteil"
COLUMN_PV_THROUGHPUT = "PV Durchsatz"


def transform_energy_dataframe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this main function it would be good to have a test.

df: pd.DataFrame,
component_types: List[str],
mcfg: Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't use the Any type annotation here. You can use the actual type, MicrogridConfig instead.

) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Transform and enrich energy dataframe.

This function processes a raw DataFrame containing energy metrics,
adding derived columns for PV production, battery throughput, and grid metrics.

Args:
df: Raw DataFrame with energy metrics, expected to have a datetime index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the input columns required on the input data?

component_types: List of component types present in the DataFrame (e.g., ["pv", "battery"]).
mcfg: Microgrid configuration object providing component type IDs and other metadata.

Returns:
A tuple containing:
- `main_df`: A DataFrame with main columns for visualization and reporting.
- `df_renamed`: A fully enriched DataFrame.
"""
# Ensure the DataFrame has a datetime index
df = df.reset_index()

# Enrich with PV-related columns
if "pv" in component_types:
df[COLUMN_PV_PROD] = -df.get(COLUMN_PV_NEG, 0)
df[COLUMN_PV_EXCESS] = (df[COLUMN_PV_PROD] - df[COLUMN_CONSUMPTION]).clip(
lower=0
)

if "battery" in component_types:
df[COLUMN_PV_IN_BAT] = df[[COLUMN_PV_EXCESS, COLUMN_BATTERY_POS]].min(
axis=1
)
else:
df[COLUMN_PV_IN_BAT] = 0

df[COLUMN_PV_FEEDIN] = df[COLUMN_PV_EXCESS] - df[COLUMN_PV_IN_BAT]
df[COLUMN_PV_SELF] = (df[COLUMN_PV_PROD] - df[COLUMN_PV_EXCESS]).clip(lower=0)
df[COLUMN_PV_SHARE] = df[COLUMN_PV_SELF] / df[COLUMN_CONSUMPTION].replace(
0, pd.NA
)

# Convert timestamp to Berlin time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would move this to the beginning of this function.

if df[COLUMN_TIMESTAMP].dt.tz is None:
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_localize("UTC")
df[COLUMN_TIMESTAMP] = df[COLUMN_TIMESTAMP].dt.tz_convert(TZ_NAME)

# Basic renaming
rename_map: Dict[str, str] = {
COLUMN_TIMESTAMP: COLUMN_TIMESTAMP_NAMED,
COLUMN_GRID: COLUMN_GRID_NAMED,
COLUMN_CONSUMPTION: COLUMN_CONSUMPTION_NAMED,
}

if "battery" in component_types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need the if clauses for component types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are customer who do not have a battery or pv and then I do not want the specific metrics or graphs to be displayed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is just the rename map, it only renames columns that exist. You can define the full mapping independent of the data you select.

rename_map[COLUMN_BATTERY] = COLUMN_BATTERY_NAMED

if "pv" in component_types:
rename_map.update(
{
"pv": COLUMN_PV_THROUGHPUT,
COLUMN_PV_PROD: COLUMN_PV_PROD,
COLUMN_PV_SELF: COLUMN_PV_SELF,
COLUMN_PV_FEEDIN: COLUMN_PV_FEEDIN,
COLUMN_PV_SHARE: COLUMN_PV_SHARE,
}
)
if "battery" in component_types:
rename_map[COLUMN_PV_BAT] = COLUMN_PV_IN_BAT

# Rename individual component IDs
single_comp = [col for col in df.columns if col.isdigit()]

if "battery" in component_types:
battery_ids = {
str(i) for i in mcfg.component_type_ids(component_type="battery")
}
rename_map.update(
{col: f"Batterie #{col}" for col in single_comp if col in battery_ids}
)

if "pv" in component_types:
pv_ids = {str(i) for i in mcfg.component_type_ids(component_type="pv")}
rename_map.update({col: f"PV #{col}" for col in single_comp if col in pv_ids})

df_renamed = df.rename(columns=rename_map)

# Add derived net import column
df_renamed[COLUMN_NET_IMPORT] = df_renamed[COLUMN_GRID_NAMED].clip(lower=0)

# Select main columns for compact display
def _get_main_columns(
columns: Iterable[str], component_types: List[str]
) -> List[str]:
base = {
COLUMN_TIMESTAMP_NAMED,
COLUMN_GRID_NAMED,
COLUMN_NET_IMPORT,
COLUMN_CONSUMPTION_NAMED,
}

if "battery" in component_types:
base.add(COLUMN_BATTERY_NAMED)

if "pv" in component_types:
base.update(
{
COLUMN_PV_THROUGHPUT,
COLUMN_PV_PROD,
COLUMN_PV_SELF,
COLUMN_PV_FEEDIN,
}
)
if "battery" in component_types:
base.update({COLUMN_PV_IN_BAT, COLUMN_PV_SHARE})

# Add individual component columns like "PV #1", "Batterie #3", etc.
base.update({col for col in columns if "#" in col})

return [col for col in columns if col in base]

main_df = df_renamed[_get_main_columns(df_renamed.columns, component_types)]

return main_df, df_renamed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need two data frames as ouput?



def compute_power_df(
main_df: pd.DataFrame, resolution: Union[str, pd.Timedelta]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

str | pd.Timedelta

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a datetime.timedelta which we usually use?

) -> pd.DataFrame:
"""Compute energy mix (PV vs grid) and return a summary power DataFrame.

Args:
main_df: DataFrame with energy data, including 'Netzbezug'
and optionally 'PV Eigenverbrauch'.
resolution: Time resolution of each row in the DataFrame (e.g., "15min").

Returns:
A DataFrame summarizing the energy source mix in kWh, %, and average kW.
"""
resolution = pd.to_timedelta(resolution)
hours = resolution.total_seconds() / 3600

# Calculate energy from grid
grid_kwh = round(main_df[COLUMN_NET_IMPORT].sum() * hours, 2)

if COLUMN_PV_SELF in main_df.columns:
# Calculate energy from PV
pv_self_kwh = round(main_df[COLUMN_PV_SELF].sum() * hours, 2)
total_kwh = pv_self_kwh + grid_kwh

energy_kwh = [pv_self_kwh, grid_kwh]
energy_labels = ["PV", "Netz"]

return pd.DataFrame(
{
"Energiebezug": energy_labels,
"Energie [kWh]": energy_kwh,
"Energie %": [round(e / total_kwh * 100, 2) for e in energy_kwh],
"Energie [kW]": [round(e / hours, 2) for e in energy_kwh],
}
)

# Only grid consumption available
return pd.DataFrame(
{
"Energiebezug": ["Netz"],
"Energie [kWh]": [grid_kwh],
"Energie %": [100.0],
"Energie [kW]": [round(grid_kwh / hours, 2)],
}
)


def print_pv_sums(main_df: pd.DataFrame, resolution: pd.Timedelta) -> None:
"""Print formatted sums for each PV column.

Args:
main_df: DataFrame containing PV columns with energy data.
resolution: Time resolution of each row in the DataFrame (e.g., "15min").
"""
pv_columns = [col for col in main_df.columns.tolist() if "PV #" in col]

for pv in pv_columns:
pv_sum = round(main_df[pv].sum() * resolution * -1, 2)
formatted_sum = (
f"{pv_sum:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
)
print(f"{pv:<7}: {formatted_sum} kWh")


def create_pv_analysis_df(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really get what this and the next function are required for.

main_df: pd.DataFrame,
pv_filter: List[str],
pvgrid_filter: str,
pv_grid_filter_options: List[str],
) -> pd.DataFrame:
"""Create a DataFrame for PV analysis based on selected filters.

Args:
main_df: DataFrame containing PV and grid data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which columns?

pv_filter: List of PV components to include (e.g., ["1", "2"] or ["Alle"]).
pvgrid_filter: Filter option for PV and grid analysis (e.g., "PV", "Grid", "PV + Grid").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these three modes?

pv_grid_filter_options: List of available filter options for PV and grid.
Returns:
A DataFrame with PV feed-in data, reshaped for analysis.
"""
# Case 1: Only PV
if pvgrid_filter == pv_grid_filter_options[1]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should check for the strings. Otherwise you rely on something passed in which you cannot control.

pv_columns = (
[col for col in main_df.columns if "PV #" in col]
if "Alle" in pv_filter
else [f"PV {pv}" for pv in pv_filter]
)
df = main_df[[COLUMN_TIMESTAMP_NAMED] + pv_columns].copy()
df = df.melt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it converted to long format?

id_vars=[COLUMN_TIMESTAMP_NAMED],
value_vars=pv_columns,
var_name="PV",
value_name=COLUMN_PV_FEEDIN,
)
df[COLUMN_PV_FEEDIN] *= -1
df["PV"] = df["PV"].str[3:]

# Case 2: Only Grid
elif pvgrid_filter == pv_grid_filter_options[2]:
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED]].copy()
df["PV"] = "#"

# Case 3: Grid + PV split
else:
pv_columns = (
[col for col in main_df.columns if "PV #" in col]
if "Alle" in pv_filter
else [f"PV {pv}" for pv in pv_filter]
)
df = main_df[[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED] + pv_columns].copy()
df = df.melt(
id_vars=[COLUMN_TIMESTAMP_NAMED, COLUMN_GRID_NAMED],
value_vars=pv_columns,
var_name="PV",
value_name=COLUMN_PV_FEEDIN,
)
df[COLUMN_GRID_NAMED] /= len(pv_columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this calculating?

df[COLUMN_PV_FEEDIN] *= -1
df["PV"] = df["PV"].str[3:]

return df


def create_battery_analysis_df(
main_df: pd.DataFrame, bat_filter: List[str]
) -> pd.DataFrame:
"""Create a DataFrame for battery analysis based on selected filters.

Args:
main_df: DataFrame containing battery data.
bat_filter: List of battery components to include (e.g., ["1", "2"] or ["Alle"]).
Returns:
A DataFrame with battery throughput data, reshaped for analysis.
"""
bat_columns = (
[col for col in main_df.columns if "Batterie #" in col]
if "Alle" in bat_filter
else [f"Batterie {i}" for i in bat_filter]
)

df = main_df[bat_columns].copy()
df[COLUMN_TIMESTAMP_NAMED] = main_df.index

df = df.melt(
id_vars=[COLUMN_TIMESTAMP_NAMED],
value_vars=bat_columns,
var_name="Batterie",
value_name=COLUMN_BATTERY_NAMED,
)
df["Batterie"] = df["Batterie"].str[9:]

return df
Loading