Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f3d594e
Expose resampling-related exceptions publicly
matthias-wende-frequenz Dec 1, 2023
a31fdcf
Add a wall clock attached timer for the resampler
llucax Jul 29, 2025
6423628
Add `ClocksInfo` tests
llucax Jul 30, 2025
fd57d3f
Add tests for `WallClockTimerConfig`
llucax Jul 30, 2025
5ba56fe
Add `conftest.py` for the wall clock timer tests
llucax Jul 30, 2025
a009455
Add utility functions for testing the wall clock timer
llucax Jul 30, 2025
6d34981
Add __init__.py files to resampling tests
llucax Jul 30, 2025
abab4e9
Add basic `WallClockTimer` tests
llucax Jul 30, 2025
f211dec
Add a pytest tool to compare time approximately
llucax Jul 30, 2025
49fa46b
Add a utility to compare `TickInfo` objects approximately
llucax Jul 30, 2025
cabf859
Add an utility to assert that a string matches a regex pattern
llucax Jul 30, 2025
9409cba
Add a fixture to monitor `asyncio.sleep()` calls
llucax Jul 30, 2025
29b49cb
Add utilities to create datetime/timedelta from seconds
llucax Jul 31, 2025
9bdd498
Add shortcut to get the monotonic (loop) time
llucax Jul 31, 2025
54862f2
Add utility to mock the clocks for complex ticking tests
llucax Jul 31, 2025
1a65397
Disable `pylint` `wrong-import-position` check
llucax Jul 31, 2025
c35d739
Add a fixture to easily use the `TimeDriver`
llucax Jul 31, 2025
d239515
Add ticking behavior tests
llucax Jul 31, 2025
dd11d09
Add custom comparison for `TickInfo` objects for pytest
llucax Jul 31, 2025
bd8d1f9
Expose more resampling symbols publicly
llucax Aug 1, 2025
1528e6b
Import `ResamplerConfig` from `timeseries` instead of `actor`
llucax Aug 1, 2025
396a097
Use internal import to avoid circular import issues
llucax Aug 1, 2025
77925a2
Add a new `ResamplerConfig2` that uses `WallClockTimerConfig`
llucax Aug 1, 2025
faa84ff
Add support for `WallClockTimer` to the `Resampler`
llucax Aug 1, 2025
0ace956
Import from public modules when possible
llucax Aug 1, 2025
6d5ee62
Make disabling of pylint checks local
llucax Aug 1, 2025
c4265f0
Add tests for the resampler using `ResampleConfig2`
llucax Aug 1, 2025
1c4f740
Use the new wall clock timer in benchmarks
llucax Aug 1, 2025
bffb950
Update documentation to use the new `ResamperConfig2`
llucax Aug 1, 2025
eb49fa5
Use `ResampleConfig2` in other tests
llucax Aug 1, 2025
a69ba24
Test wall clock timer in resampled moving window tests
llucax Aug 1, 2025
aa6099d
Update release notes
llucax Aug 1, 2025
ae383de
fixup! Add a wall clock attached timer for the resampler
llucax Aug 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,21 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- A new configuration mode was added to the resampler (and thus the resampling actor and microgrid high-level interface). When passing a new `ResamplerConfig2` instance to the resampler, it will use a wall clock timer instead of a monotonic clock timer. This timer adjustes sleeps to account for drifts in the monotonic clock, and thus allows for more accurate resampling in cases where the monotonic clock drifts away from the wall clock. The monotonic clock timer option will be deprecated in the future, as it is not really suitable for resampling. The new `ResamplerConfig2` class accepts a `WallClockTimerConfig` to fine-tune the wall clock timer behavior, if necessary.

Example usage:

```python
from frequenz.sdk import microgrid
from frequenz.sdk.timeseries import ResamplerConfig2

await microgrid.initialize(
MICROGRID_API_URL,
# Just replace the old `ResamplerConfig` with the new `ResamplerConfig2`
resampler_config=ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
)
```

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- When using the new wall clock timer in the resampmler, it will now resync to the system time if it drifts away for more than a resample period, and do dynamic adjustments to the timer if the monotonic clock has a small drift compared to the wall clock.
4 changes: 2 additions & 2 deletions benchmarks/power_distribution/power_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from frequenz.quantities import Power

from frequenz.sdk import microgrid
from frequenz.sdk.actor import ResamplerConfig
from frequenz.sdk.microgrid import connection_manager
from frequenz.sdk.microgrid._power_distributing import (
ComponentPoolStatus,
Expand All @@ -29,6 +28,7 @@
Result,
Success,
)
from frequenz.sdk.timeseries import ResamplerConfig2

HOST = "microgrid.sandbox.api.frequenz.io"
PORT = 62060
Expand Down Expand Up @@ -140,7 +140,7 @@ async def run() -> None:
"""Create microgrid api and run tests."""
await microgrid.initialize(
"grpc://microgrid.sandbox.api.frequenz.io:62060",
ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
)

all_batteries: set[Component] = connection_manager.get().component_graph.components(
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/timeseries/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datetime import datetime, timedelta, timezone
from timeit import timeit

from frequenz.sdk.timeseries import ResamplerConfig
from frequenz.sdk.timeseries import ResamplerConfig, ResamplerConfig2
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper

Expand All @@ -25,7 +25,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
"""Benchmark the resampling helper."""
helper = _ResamplingHelper(
"benchmark",
ResamplerConfig(
ResamplerConfig2(
resampling_period=timedelta(seconds=1.0),
max_data_age_in_periods=3.0,
resampling_function=nop,
Expand Down
8 changes: 4 additions & 4 deletions docs/tutorials/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import asyncio

from datetime import timedelta
from frequenz.sdk import microgrid
from frequenz.sdk.actor import ResamplerConfig
from frequenz.sdk.timeseries import ResamplerConfig2
```

## Create the application skeleton
Expand All @@ -49,7 +49,7 @@ async def run() -> None:
# Initialize the microgrid
await microgrid.initialize(
server_url,
ResamplerConfig(resampling_period=timedelta(seconds=1)),
ResamplerConfig2(resampling_period=timedelta(seconds=1)),
)

# Define your application logic here
Expand Down Expand Up @@ -100,7 +100,7 @@ import asyncio

from datetime import timedelta
from frequenz.sdk import microgrid
from frequenz.sdk.actor import ResamplerConfig
from frequenz.sdk.timeseries import ResamplerConfig2

async def run() -> None:
# This points to the default Frequenz microgrid sandbox
Expand All @@ -109,7 +109,7 @@ async def run() -> None:
# Initialize the microgrid
await microgrid.initialize(
server_url,
ResamplerConfig(resampling_period=timedelta(seconds=1)),
ResamplerConfig2(resampling_period=timedelta(seconds=1)),
)

# Define your application logic here
Expand Down
4 changes: 2 additions & 2 deletions examples/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from frequenz.channels import merge

from frequenz.sdk import microgrid
from frequenz.sdk.actor import ResamplerConfig
from frequenz.sdk.timeseries import ResamplerConfig2

MICROGRID_API_URL = "grpc://microgrid.sandbox.api.frequenz.io:62060"

Expand All @@ -24,7 +24,7 @@ async def main() -> None:

await microgrid.initialize(
MICROGRID_API_URL,
resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
resampler_config=ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
)

battery_pool = microgrid.new_battery_pool(priority=5)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ disable = [
"unnecessary-lambda-assignment",
"unused-import",
"unused-variable",
"wrong-import-position",
]

[tool.pylint.design]
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/sdk/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@

from datetime import timedelta

from ..actor import ResamplerConfig
from ..timeseries._resampling._config import ResamplerConfig
from . import _data_pipeline, connection_manager
from ._data_pipeline import (
consumer,
Expand Down
34 changes: 31 additions & 3 deletions src/frequenz/sdk/timeseries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,46 @@
from ._fuse import Fuse
from ._moving_window import MovingWindow
from ._periodic_feature_extractor import PeriodicFeatureExtractor
from ._resampling._base_types import SourceProperties
from ._resampling._config import ResamplerConfig, ResamplingFunction
from ._resampling._base_types import Sink, Source, SourceProperties
from ._resampling._config import (
DEFAULT_BUFFER_LEN_INIT,
DEFAULT_BUFFER_LEN_MAX,
DEFAULT_BUFFER_LEN_WARN,
ResamplerConfig,
ResamplerConfig2,
ResamplingFunction,
ResamplingFunction2,
)
from ._resampling._exceptions import ResamplingError, SourceStoppedError
from ._resampling._wall_clock_timer import (
ClocksInfo,
TickInfo,
WallClockTimer,
WallClockTimerConfig,
)

__all__ = [
"Bounds",
"ClocksInfo",
"DEFAULT_BUFFER_LEN_INIT",
"DEFAULT_BUFFER_LEN_MAX",
"DEFAULT_BUFFER_LEN_WARN",
"Fuse",
"MovingWindow",
"PeriodicFeatureExtractor",
"ResamplerConfig",
"ReceiverFetcher",
"ResamplerConfig",
"ResamplerConfig2",
"ResamplingError",
"ResamplingFunction",
"ResamplingFunction2",
"Sample",
"Sample3Phase",
"Sink",
"Source",
"SourceProperties",
"SourceStoppedError",
"TickInfo",
"WallClockTimer",
"WallClockTimerConfig",
]
173 changes: 172 additions & 1 deletion src/frequenz/sdk/timeseries/_resampling/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import logging
import statistics
from collections.abc import Sequence
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Protocol

from frequenz.core.datetime import UNIX_EPOCH

from ._base_types import SourceProperties
from ._wall_clock_timer import WallClockTimerConfig

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -210,3 +211,173 @@ def __post_init__(self) -> None:
raise ValueError(
f"align_to ({self.align_to}) should be a timezone aware datetime"
)


class ResamplingFunction2(Protocol):
"""Combine multiple samples into a new one.

A resampling function produces a new sample based on a list of pre-existing
samples. It can do "upsampling" when the data rate of the `input_samples`
period is smaller than the `resampling_period`, or "downsampling" if it is
bigger.

In general, a resampling window is the same as the `resampling_period`, and
this function might receive input samples from multiple windows in the past to
enable extrapolation, but no samples from the future (so the timestamp of the
new sample that is going to be produced will always be bigger than the biggest
timestamp in the input data).
"""

def __call__(
self,
input_samples: Sequence[tuple[datetime, float]],
resampler_config: ResamplerConfig | ResamplerConfig2,
source_properties: SourceProperties,
/,
) -> float:
"""Call the resampling function.

Args:
input_samples: The sequence of pre-existing samples, where the first item is
the timestamp of the sample, and the second is the value of the sample.
The sequence must be non-empty.
resampler_config: The configuration of the resampler calling this
function.
source_properties: The properties of the source being resampled.

Returns:
The value of new sample produced after the resampling.
"""
... # pylint: disable=unnecessary-ellipsis


@dataclass(frozen=True)
class ResamplerConfig2(ResamplerConfig):
"""Resampler configuration."""

resampling_period: timedelta
"""The resampling period.

This is the time it passes between resampled data should be calculated.

It must be a positive time span.
"""

max_data_age_in_periods: float = 3.0
"""The maximum age a sample can have to be considered *relevant* for resampling.

Expressed in number of periods, where period is the `resampling_period`
if we are downsampling (resampling period bigger than the input period) or
the *input sampling period* if we are upsampling (input period bigger than
the resampling period).

It must be bigger than 1.0.

Example:
If `resampling_period` is 3 seconds, the input sampling period is
1 and `max_data_age_in_periods` is 2, then data older than 3*2
= 6 seconds will be discarded when creating a new sample and never
passed to the resampling function.

If `resampling_period` is 3 seconds, the input sampling period is
5 and `max_data_age_in_periods` is 2, then data older than 5*2
= 10 seconds will be discarded when creating a new sample and never
passed to the resampling function.
"""

resampling_function: ResamplingFunction2 = lambda samples, _, __: statistics.fmean(
s[1] for s in samples
)
"""The resampling function.

This function will be applied to the sequence of relevant samples at
a given time. The result of the function is what is sent as the resampled
value.
"""

initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
"""The initial length of the resampling buffer.

The buffer could grow or shrink depending on the source properties,
like sampling rate, to make sure all the requested past sampling periods
can be stored.

It must be at least 1 and at most `max_buffer_len`.
"""

warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
"""The minimum length of the resampling buffer that will emit a warning.

If a buffer grows bigger than this value, it will emit a warning in the
logs, so buffers don't grow too big inadvertently.

It must be at least 1 and at most `max_buffer_len`.
"""

max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
"""The maximum length of the resampling buffer.

Buffers won't be allowed to grow beyond this point even if it would be
needed to keep all the requested past sampling periods. An error will be
emitted in the logs if the buffer length needs to be truncated to this
value.

It must be at bigger than `warn_buffer_len`.
"""

align_to: datetime | None = field(default=None, init=False)
"""Deprecated: Use timer_config.align_to instead."""

timer_config: WallClockTimerConfig | None = None
"""The custom configuration of the wall clock timer used to keep track of time.

If not provided or `None`, a configuration will be created by passing the
[`resampling_period`][frequenz.sdk.timeseries.ResamplerConfig2.resampling_period] to
the [`from_interval()`][frequenz.sdk.timeseries.WallClockTimerConfig.from_interval]
method.
"""

def __post_init__(self) -> None:
"""Check that config values are valid.

Raises:
ValueError: If any value is out of range.
"""
if self.resampling_period.total_seconds() < 0.0:
raise ValueError(
f"resampling_period ({self.resampling_period}) must be positive"
)
if self.max_data_age_in_periods < 1.0:
raise ValueError(
f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
)
if self.warn_buffer_len < 1:
raise ValueError(
f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
)
if self.max_buffer_len <= self.warn_buffer_len:
raise ValueError(
f"max_buffer_len ({self.max_buffer_len}) should "
f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
)

if self.initial_buffer_len < 1:
raise ValueError(
f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
)
if self.initial_buffer_len > self.max_buffer_len:
raise ValueError(
f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
"initial_buffer_len or a bigger max_buffer_len"
)
if self.initial_buffer_len > self.warn_buffer_len:
_logger.warning(
"initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
self.initial_buffer_len,
self.warn_buffer_len,
)
if self.align_to is not None:
raise ValueError(
f"align_to ({self.align_to}) must be specified via timer_config"
)
Loading