Skip to content

Commit 73fa586

Browse files
authored
Add a wall clock attached timer (#1249)
This timer uses the wall clock to trigger ticks and handles discrepancies between the wall clock and monotonic time. Since sleeping is performed using monotonic time, differences between the two clocks can occur. When the wall clock progresses slower than monotonic time, it is referred to as *compression* (wall clock time appears in the past relative to monotonic time). Conversely, when the wall clock progresses faster, it is called *expansion* (wall clock time appears in the future relative to monotonic time). If these differences exceed a configured threshold, a warning is emitted. If the difference becomes excessively large, it is treated as a *time jump*. Time jumps can occur, for example, when the wall clock is adjusted by NTP after being out of sync for an extended period. In such cases, the timer resynchronizes with the wall clock and triggers an immediate tick.
2 parents c2e45b3 + 88c6c53 commit 73fa586

35 files changed

+3176
-125
lines changed

RELEASE_NOTES.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,21 @@
1212

1313
## New Features
1414

15-
<!-- Here goes the main new features and examples or instructions on how to use them -->
15+
- A new configuration mode was added to the resampler (and thus the resampling actor and microgrid high-level interface). When passing a new `ResamplerConfig2` instance to the resampler, it will use a wall clock timer instead of a monotonic clock timer. This timer adjustes sleeps to account for drifts in the monotonic clock, and thus allows for more accurate resampling in cases where the monotonic clock drifts away from the wall clock. The monotonic clock timer option will be deprecated in the future, as it is not really suitable for resampling. The new `ResamplerConfig2` class accepts a `WallClockTimerConfig` to fine-tune the wall clock timer behavior, if necessary.
16+
17+
Example usage:
18+
19+
```python
20+
from frequenz.sdk import microgrid
21+
from frequenz.sdk.timeseries import ResamplerConfig2
22+
23+
await microgrid.initialize(
24+
MICROGRID_API_URL,
25+
# Just replace the old `ResamplerConfig` with the new `ResamplerConfig2`
26+
resampler_config=ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
27+
)
28+
```
1629

1730
## Bug Fixes
1831

19-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
32+
- When using the new wall clock timer in the resampmler, it will now resync to the system time if it drifts away for more than a resample period, and do dynamic adjustments to the timer if the monotonic clock has a small drift compared to the wall clock.

benchmarks/power_distribution/power_distributor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from frequenz.quantities import Power
1818

1919
from frequenz.sdk import microgrid
20-
from frequenz.sdk.actor import ResamplerConfig
2120
from frequenz.sdk.microgrid import connection_manager
2221
from frequenz.sdk.microgrid._power_distributing import (
2322
ComponentPoolStatus,
@@ -29,6 +28,7 @@
2928
Result,
3029
Success,
3130
)
31+
from frequenz.sdk.timeseries import ResamplerConfig2
3232

3333
HOST = "microgrid.sandbox.api.frequenz.io"
3434
PORT = 62060
@@ -140,7 +140,7 @@ async def run() -> None:
140140
"""Create microgrid api and run tests."""
141141
await microgrid.initialize(
142142
"grpc://microgrid.sandbox.api.frequenz.io:62060",
143-
ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
143+
ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
144144
)
145145

146146
all_batteries: set[Component] = connection_manager.get().component_graph.components(

benchmarks/timeseries/resampling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from datetime import datetime, timedelta, timezone
88
from timeit import timeit
99

10-
from frequenz.sdk.timeseries import ResamplerConfig
10+
from frequenz.sdk.timeseries import ResamplerConfig, ResamplerConfig2
1111
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
1212
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper
1313

@@ -25,7 +25,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2525
"""Benchmark the resampling helper."""
2626
helper = _ResamplingHelper(
2727
"benchmark",
28-
ResamplerConfig(
28+
ResamplerConfig2(
2929
resampling_period=timedelta(seconds=1.0),
3030
max_data_age_in_periods=3.0,
3131
resampling_function=nop,

docs/tutorials/getting_started.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import asyncio
3232

3333
from datetime import timedelta
3434
from frequenz.sdk import microgrid
35-
from frequenz.sdk.actor import ResamplerConfig
35+
from frequenz.sdk.timeseries import ResamplerConfig2
3636
```
3737

3838
## Create the application skeleton
@@ -49,7 +49,7 @@ async def run() -> None:
4949
# Initialize the microgrid
5050
await microgrid.initialize(
5151
server_url,
52-
ResamplerConfig(resampling_period=timedelta(seconds=1)),
52+
ResamplerConfig2(resampling_period=timedelta(seconds=1)),
5353
)
5454

5555
# Define your application logic here
@@ -100,7 +100,7 @@ import asyncio
100100

101101
from datetime import timedelta
102102
from frequenz.sdk import microgrid
103-
from frequenz.sdk.actor import ResamplerConfig
103+
from frequenz.sdk.timeseries import ResamplerConfig2
104104

105105
async def run() -> None:
106106
# This points to the default Frequenz microgrid sandbox
@@ -109,7 +109,7 @@ async def run() -> None:
109109
# Initialize the microgrid
110110
await microgrid.initialize(
111111
server_url,
112-
ResamplerConfig(resampling_period=timedelta(seconds=1)),
112+
ResamplerConfig2(resampling_period=timedelta(seconds=1)),
113113
)
114114

115115
# Define your application logic here

examples/battery_pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from frequenz.channels import merge
1212

1313
from frequenz.sdk import microgrid
14-
from frequenz.sdk.actor import ResamplerConfig
14+
from frequenz.sdk.timeseries import ResamplerConfig2
1515

1616
MICROGRID_API_URL = "grpc://microgrid.sandbox.api.frequenz.io:62060"
1717

@@ -24,7 +24,7 @@ async def main() -> None:
2424

2525
await microgrid.initialize(
2626
MICROGRID_API_URL,
27-
resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
27+
resampler_config=ResamplerConfig2(resampling_period=timedelta(seconds=1.0)),
2828
)
2929

3030
battery_pool = microgrid.new_battery_pool(priority=5)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ disable = [
158158
"unnecessary-lambda-assignment",
159159
"unused-import",
160160
"unused-variable",
161+
"wrong-import-position",
161162
]
162163

163164
[tool.pylint.design]

src/frequenz/sdk/microgrid/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@
315315

316316
from datetime import timedelta
317317

318-
from ..actor import ResamplerConfig
318+
from ..timeseries._resampling._config import ResamplerConfig
319319
from . import _data_pipeline, connection_manager
320320
from ._data_pipeline import (
321321
consumer,

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,46 @@
4040
from ._fuse import Fuse
4141
from ._moving_window import MovingWindow
4242
from ._periodic_feature_extractor import PeriodicFeatureExtractor
43-
from ._resampling._base_types import SourceProperties
44-
from ._resampling._config import ResamplerConfig, ResamplingFunction
43+
from ._resampling._base_types import Sink, Source, SourceProperties
44+
from ._resampling._config import (
45+
DEFAULT_BUFFER_LEN_INIT,
46+
DEFAULT_BUFFER_LEN_MAX,
47+
DEFAULT_BUFFER_LEN_WARN,
48+
ResamplerConfig,
49+
ResamplerConfig2,
50+
ResamplingFunction,
51+
ResamplingFunction2,
52+
)
53+
from ._resampling._exceptions import ResamplingError, SourceStoppedError
54+
from ._resampling._wall_clock_timer import (
55+
ClocksInfo,
56+
TickInfo,
57+
WallClockTimer,
58+
WallClockTimerConfig,
59+
)
4560

4661
__all__ = [
4762
"Bounds",
63+
"ClocksInfo",
64+
"DEFAULT_BUFFER_LEN_INIT",
65+
"DEFAULT_BUFFER_LEN_MAX",
66+
"DEFAULT_BUFFER_LEN_WARN",
4867
"Fuse",
4968
"MovingWindow",
5069
"PeriodicFeatureExtractor",
51-
"ResamplerConfig",
5270
"ReceiverFetcher",
71+
"ResamplerConfig",
72+
"ResamplerConfig2",
73+
"ResamplingError",
5374
"ResamplingFunction",
75+
"ResamplingFunction2",
5476
"Sample",
5577
"Sample3Phase",
78+
"Sink",
79+
"Source",
5680
"SourceProperties",
81+
"SourceStoppedError",
82+
"TickInfo",
83+
"WallClockTimer",
84+
"WallClockTimerConfig",
5785
]

src/frequenz/sdk/timeseries/_resampling/_config.py

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
import logging
99
import statistics
1010
from collections.abc import Sequence
11-
from dataclasses import dataclass
11+
from dataclasses import dataclass, field
1212
from datetime import datetime, timedelta
1313
from typing import Protocol
1414

1515
from frequenz.core.datetime import UNIX_EPOCH
1616

1717
from ._base_types import SourceProperties
18+
from ._wall_clock_timer import WallClockTimerConfig
1819

1920
_logger = logging.getLogger(__name__)
2021

@@ -210,3 +211,173 @@ def __post_init__(self) -> None:
210211
raise ValueError(
211212
f"align_to ({self.align_to}) should be a timezone aware datetime"
212213
)
214+
215+
216+
class ResamplingFunction2(Protocol):
217+
"""Combine multiple samples into a new one.
218+
219+
A resampling function produces a new sample based on a list of pre-existing
220+
samples. It can do "upsampling" when the data rate of the `input_samples`
221+
period is smaller than the `resampling_period`, or "downsampling" if it is
222+
bigger.
223+
224+
In general, a resampling window is the same as the `resampling_period`, and
225+
this function might receive input samples from multiple windows in the past to
226+
enable extrapolation, but no samples from the future (so the timestamp of the
227+
new sample that is going to be produced will always be bigger than the biggest
228+
timestamp in the input data).
229+
"""
230+
231+
def __call__(
232+
self,
233+
input_samples: Sequence[tuple[datetime, float]],
234+
resampler_config: ResamplerConfig | ResamplerConfig2,
235+
source_properties: SourceProperties,
236+
/,
237+
) -> float:
238+
"""Call the resampling function.
239+
240+
Args:
241+
input_samples: The sequence of pre-existing samples, where the first item is
242+
the timestamp of the sample, and the second is the value of the sample.
243+
The sequence must be non-empty.
244+
resampler_config: The configuration of the resampler calling this
245+
function.
246+
source_properties: The properties of the source being resampled.
247+
248+
Returns:
249+
The value of new sample produced after the resampling.
250+
"""
251+
... # pylint: disable=unnecessary-ellipsis
252+
253+
254+
@dataclass(frozen=True)
255+
class ResamplerConfig2(ResamplerConfig):
256+
"""Resampler configuration."""
257+
258+
resampling_period: timedelta
259+
"""The resampling period.
260+
261+
This is the time it passes between resampled data should be calculated.
262+
263+
It must be a positive time span.
264+
"""
265+
266+
max_data_age_in_periods: float = 3.0
267+
"""The maximum age a sample can have to be considered *relevant* for resampling.
268+
269+
Expressed in number of periods, where period is the `resampling_period`
270+
if we are downsampling (resampling period bigger than the input period) or
271+
the *input sampling period* if we are upsampling (input period bigger than
272+
the resampling period).
273+
274+
It must be bigger than 1.0.
275+
276+
Example:
277+
If `resampling_period` is 3 seconds, the input sampling period is
278+
1 and `max_data_age_in_periods` is 2, then data older than 3*2
279+
= 6 seconds will be discarded when creating a new sample and never
280+
passed to the resampling function.
281+
282+
If `resampling_period` is 3 seconds, the input sampling period is
283+
5 and `max_data_age_in_periods` is 2, then data older than 5*2
284+
= 10 seconds will be discarded when creating a new sample and never
285+
passed to the resampling function.
286+
"""
287+
288+
resampling_function: ResamplingFunction2 = lambda samples, _, __: statistics.fmean(
289+
s[1] for s in samples
290+
)
291+
"""The resampling function.
292+
293+
This function will be applied to the sequence of relevant samples at
294+
a given time. The result of the function is what is sent as the resampled
295+
value.
296+
"""
297+
298+
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
299+
"""The initial length of the resampling buffer.
300+
301+
The buffer could grow or shrink depending on the source properties,
302+
like sampling rate, to make sure all the requested past sampling periods
303+
can be stored.
304+
305+
It must be at least 1 and at most `max_buffer_len`.
306+
"""
307+
308+
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
309+
"""The minimum length of the resampling buffer that will emit a warning.
310+
311+
If a buffer grows bigger than this value, it will emit a warning in the
312+
logs, so buffers don't grow too big inadvertently.
313+
314+
It must be at least 1 and at most `max_buffer_len`.
315+
"""
316+
317+
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
318+
"""The maximum length of the resampling buffer.
319+
320+
Buffers won't be allowed to grow beyond this point even if it would be
321+
needed to keep all the requested past sampling periods. An error will be
322+
emitted in the logs if the buffer length needs to be truncated to this
323+
value.
324+
325+
It must be at bigger than `warn_buffer_len`.
326+
"""
327+
328+
align_to: datetime | None = field(default=None, init=False)
329+
"""Deprecated: Use timer_config.align_to instead."""
330+
331+
timer_config: WallClockTimerConfig | None = None
332+
"""The custom configuration of the wall clock timer used to keep track of time.
333+
334+
If not provided or `None`, a configuration will be created by passing the
335+
[`resampling_period`][frequenz.sdk.timeseries.ResamplerConfig2.resampling_period] to
336+
the [`from_interval()`][frequenz.sdk.timeseries.WallClockTimerConfig.from_interval]
337+
method.
338+
"""
339+
340+
def __post_init__(self) -> None:
341+
"""Check that config values are valid.
342+
343+
Raises:
344+
ValueError: If any value is out of range.
345+
"""
346+
if self.resampling_period.total_seconds() < 0.0:
347+
raise ValueError(
348+
f"resampling_period ({self.resampling_period}) must be positive"
349+
)
350+
if self.max_data_age_in_periods < 1.0:
351+
raise ValueError(
352+
f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
353+
)
354+
if self.warn_buffer_len < 1:
355+
raise ValueError(
356+
f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
357+
)
358+
if self.max_buffer_len <= self.warn_buffer_len:
359+
raise ValueError(
360+
f"max_buffer_len ({self.max_buffer_len}) should "
361+
f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
362+
)
363+
364+
if self.initial_buffer_len < 1:
365+
raise ValueError(
366+
f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
367+
)
368+
if self.initial_buffer_len > self.max_buffer_len:
369+
raise ValueError(
370+
f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
371+
f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
372+
"initial_buffer_len or a bigger max_buffer_len"
373+
)
374+
if self.initial_buffer_len > self.warn_buffer_len:
375+
_logger.warning(
376+
"initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
377+
self.initial_buffer_len,
378+
self.warn_buffer_len,
379+
)
380+
if self.align_to is not None:
381+
raise ValueError(
382+
f"align_to ({self.align_to}) must be specified via timer_config"
383+
)

0 commit comments

Comments
 (0)