Skip to content

Commit f52f791

Browse files
authored
Make the registry type-aware (#830)
The channel registry is using `Any` as the message type, which is not type safe, as it completely *disables* type checking for the messages. This commit makes the channel registry type-aware, so channels are stored with their message type and the registry checks that the same channel is not used for different message types. This also makes the registry just a plain container for channels, the wrapper methods to create new senders and receivers, and to configure the `resend_latest` flag are removed. The `ReceiverFetcher` abstraction is also not needed if we just return the channel directly, as the channel itself is a `ReceiverFetcher`. Also the method to close and remove a channel is made public and the name more explicit, as it is used in normal code paths. The new registry only provide 2 main methods: * `get_or_create()`: Get or create a channel for the given key, doing the type checking to make sure the requested message type matches the existing channel message type if it already exists. * `close_and_remove()`: Close and remove the channel for the given key. This change uncovered 5 issues: * `MockMigrogrid/Resampler: Fail on unhandled exceptions` (in this PR, but more generally #826) * `Fix checks in tests` (in this PR) * `Fix missing conversion to `QuantityT` (workaroud in this PR, but should be better solved by #821) * #807 * #823 Fixes #806.
2 parents 3e9c892 + 11da62a commit f52f791

19 files changed

+379
-191
lines changed

RELEASE_NOTES.md

+18
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,28 @@
5050

5151
- The `microgrid.frequency()` method no longer supports passing the `component` parameter. Instead the best component is automatically selected.
5252

53+
- The `actor.ChannelRegistry` was rewritten to be type-aware and just a container of channels. You now need to provide the type of message that will be contained by the channel and use the `get_or_create()` method to get a channel and the `stop_and_remove()` method to stop and remove a channel. Once you get a channel you can create new senders and receivers, or set channel options, as usual. Please read the docs for a full description, but in general this:
54+
55+
```python
56+
r = registry.new_receiver(name)
57+
s = registry.new_sender(name)
58+
```
59+
60+
Should be replaced by:
61+
62+
```python
63+
r = registry.get_or_create(T, name).new_receiver()
64+
s = registry.get_or_create(T, name).new_sender()
65+
```
66+
67+
- The `ReceiverFetcher` interface was slightly changed to make `maxsize` a keyword-only argument. This is to make it compatible with the `Broadcast` channel, so it can be considered a `ReceiverFetcher`.
68+
5369
## New Features
5470

5571
- A new method `microgrid.voltage()` was added to allow easy access to the phase-to-neutral 3-phase voltage of the microgrid.
5672

73+
- The `actor.ChannelRegistry` is now type-aware.
74+
5775
## Bug Fixes
5876

5977
- 0W power requests are now not adjusted to exclusion bounds by the `PowerManager` and `PowerDistributor`, and are sent over to the microgrid API directly.

benchmarks/timeseries/benchmark_datasourcing.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ async def consume(channel: Receiver[Any]) -> None:
108108
"current_phase_requests", evc_id, component_metric_id, None
109109
)
110110

111-
recv_channel = channel_registry.new_receiver(request.get_channel_name())
111+
recv_channel = channel_registry.get_or_create(
112+
ComponentMetricRequest, request.get_channel_name()
113+
).new_receiver()
112114

113115
await request_sender.send(request)
114116
consume_tasks.append(asyncio.create_task(consume(recv_channel)))

src/frequenz/sdk/_internal/_channels.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ReceiverFetcher(typing.Generic[T], typing.Protocol):
1616
"""An interface that just exposes a `new_receiver` method."""
1717

1818
@abc.abstractmethod
19-
def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
19+
def new_receiver(self, *, maxsize: int = 50) -> Receiver[T]:
2020
"""Get a receiver from the channel.
2121
2222
Args:

src/frequenz/sdk/actor/_channel_registry.py

+109-93
Original file line numberDiff line numberDiff line change
@@ -3,132 +3,148 @@
33

44
"""A class that would dynamically create, own and provide access to channels."""
55

6-
from __future__ import annotations
6+
import dataclasses
7+
import logging
8+
import traceback
9+
from typing import TypeVar, cast
710

8-
import typing
11+
from frequenz.channels import Broadcast
912

10-
from frequenz.channels import Broadcast, Receiver, Sender
11-
12-
from .._internal._channels import ReceiverFetcher
13+
_T = TypeVar("_T")
14+
_logger = logging.getLogger(__name__)
1315

1416

1517
class ChannelRegistry:
16-
"""Dynamically creates, own and provide access to channels.
18+
"""Dynamically creates, own and provide access to broadcast channels.
1719
1820
It can be used by actors to dynamically establish a communication channel
19-
between each other. Channels are identified by string names.
21+
between each other.
22+
23+
The registry is responsible for creating channels when they are first requested via
24+
the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.
25+
26+
The registry also stores type information to make sure that the same channel is not
27+
used for different message types.
28+
29+
Since the registry owns the channels, it is also responsible for closing them when
30+
they are no longer needed. There is no way to remove a channel without closing it.
31+
32+
Note:
33+
This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
2034
"""
2135

2236
def __init__(self, *, name: str) -> None:
23-
"""Create a `ChannelRegistry` instance.
37+
"""Initialize this registry.
2438
2539
Args:
26-
name: A unique name for the registry.
40+
name: A name to identify the registry in the logs. This name is also used as
41+
a prefix for the channel names.
2742
"""
2843
self._name = name
29-
self._channels: dict[str, Broadcast[typing.Any]] = {}
44+
self._channels: dict[str, _Entry] = {}
3045

31-
def set_resend_latest(self, key: str, resend_latest: bool) -> None:
32-
"""Set the `resend_latest` flag for a given channel.
46+
@property
47+
def name(self) -> str:
48+
"""The name of this registry."""
49+
return self._name
3350

34-
This flag controls whether the latest value of the channel should be resent to
35-
new receivers, in slow streams.
36-
37-
`resend_latest` is `False` by default. It is safe to be set in data/reporting
38-
channels, but is not recommended for use in channels that stream control
39-
instructions.
51+
def message_type(self, key: str) -> type:
52+
"""Get the message type of the channel for the given key.
4053
4154
Args:
4255
key: The key to identify the channel.
43-
resend_latest: Whether to resend the latest value to new receivers, for the
44-
given channel.
45-
"""
46-
if key not in self._channels:
47-
self._channels[key] = Broadcast(f"{self._name}-{key}")
48-
# This attribute is protected in the current version of the channels library,
49-
# but that will change in the future.
50-
self._channels[key].resend_latest = resend_latest
51-
52-
def new_sender(self, key: str) -> Sender[typing.Any]:
53-
"""Get a sender to a dynamically created channel with the given key.
54-
55-
Args:
56-
key: A key to identify the channel.
5756
5857
Returns:
59-
A sender to a dynamically created channel with the given key.
60-
"""
61-
if key not in self._channels:
62-
self._channels[key] = Broadcast(f"{self._name}-{key}")
63-
return self._channels[key].new_sender()
64-
65-
def new_receiver(self, key: str, maxsize: int = 50) -> Receiver[typing.Any]:
66-
"""Get a receiver to a dynamically created channel with the given key.
58+
The message type of the channel.
6759
68-
Args:
69-
key: A key to identify the channel.
70-
maxsize: The maximum size of the receiver.
71-
72-
Returns:
73-
A receiver for a dynamically created channel with the given key.
60+
Raises:
61+
KeyError: If the channel does not exist.
7462
"""
75-
if key not in self._channels:
76-
self._channels[key] = Broadcast(f"{self._name}-{key}")
77-
return self._channels[key].new_receiver(maxsize=maxsize)
63+
entry = self._channels.get(key)
64+
if entry is None:
65+
raise KeyError(f"No channel for key {key!r} exists.")
66+
return entry.message_type
7867

79-
def new_receiver_fetcher(self, key: str) -> ReceiverFetcher[typing.Any]:
80-
"""Get a receiver fetcher to a dynamically created channel with the given key.
68+
def __contains__(self, key: str) -> bool:
69+
"""Check whether the channel for the given `key` exists."""
70+
return key in self._channels
8171

82-
Args:
83-
key: A key to identify the channel.
84-
85-
Returns:
86-
A receiver fetcher for a dynamically created channel with the given key.
87-
"""
88-
if key not in self._channels:
89-
self._channels[key] = Broadcast(f"{self._name}-{key}")
90-
return _RegistryReceiverFetcher(self, key)
72+
def get_or_create(self, message_type: type[_T], key: str) -> Broadcast[_T]:
73+
"""Get or create a channel for the given key.
9174
92-
async def _close_channel(self, key: str) -> None:
93-
"""Close a channel with the given key.
75+
If a channel for the given key already exists, the message type of the existing
76+
channel is checked against the requested message type. If they do not match,
77+
a `ValueError` is raised.
9478
95-
This method is private and should only be used in special cases.
79+
Note:
80+
The types have to match exactly, it doesn't do a subtype check due to
81+
technical limitations. In the future subtype checks might be supported.
9682
9783
Args:
98-
key: A key to identify the channel.
99-
"""
100-
if key in self._channels:
101-
if channel := self._channels.pop(key, None):
102-
await channel.close()
103-
104-
105-
T = typing.TypeVar("T")
106-
107-
108-
class _RegistryReceiverFetcher(typing.Generic[T]):
109-
"""A receiver fetcher that is bound to a channel registry and a key."""
84+
message_type: The type of the message that is sent through the channel.
85+
key: The key to identify the channel.
11086
111-
def __init__(
112-
self,
113-
registry: ChannelRegistry,
114-
key: str,
115-
) -> None:
116-
"""Create a new instance of a receiver fetcher.
87+
Returns:
88+
The channel for the given key.
11789
118-
Args:
119-
registry: The channel registry.
120-
key: A key to identify the channel.
90+
Raises:
91+
ValueError: If the channel exists and the message type does not match.
12192
"""
122-
self._registry = registry
123-
self._key = key
124-
125-
def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
126-
"""Get a receiver from the channel.
93+
if key not in self._channels:
94+
if _logger.isEnabledFor(logging.DEBUG):
95+
_logger.debug(
96+
"Creating a new channel for key %r with type %s at:\n%s",
97+
key,
98+
message_type,
99+
"".join(traceback.format_stack(limit=10)[:9]),
100+
)
101+
self._channels[key] = _Entry(message_type, Broadcast(f"{self._name}-{key}"))
102+
103+
entry = self._channels[key]
104+
if entry.message_type is not message_type:
105+
exception = ValueError(
106+
f"Type mismatch, a channel for key {key!r} exists and the requested "
107+
f"message type {message_type} is not the same as the existing "
108+
f"message type {entry.message_type}."
109+
)
110+
if _logger.isEnabledFor(logging.DEBUG):
111+
_logger.debug(
112+
"%s at:\n%s",
113+
str(exception),
114+
# We skip the last frame because it's this method, and limit the
115+
# stack to 9 frames to avoid adding too much noise.
116+
"".join(traceback.format_stack(limit=10)[:9]),
117+
)
118+
raise exception
119+
120+
return cast(Broadcast[_T], entry.channel)
121+
122+
async def close_and_remove(self, key: str) -> None:
123+
"""Remove the channel for the given key.
127124
128125
Args:
129-
maxsize: The maximum size of the receiver.
126+
key: The key to identify the channel.
130127
131-
Returns:
132-
A receiver instance.
128+
Raises:
129+
KeyError: If the channel does not exist.
133130
"""
134-
return self._registry.new_receiver(self._key, maxsize)
131+
entry = self._channels.pop(key, None)
132+
if entry is None:
133+
raise KeyError(f"No channel for key {key!r} exists.")
134+
await entry.channel.close()
135+
136+
137+
@dataclasses.dataclass(frozen=True)
138+
class _Entry:
139+
"""An entry in a channel registry."""
140+
141+
message_type: type
142+
"""The type of the message that is sent through the channel in this entry."""
143+
144+
# We use object instead of Any to minimize the chances of hindering type checking.
145+
# If for some reason the channel is not casted to the proper underlaying type, when
146+
# using object at least accessing any member that's not part of the object base
147+
# class will yield a type error, while if we used Any, it would not and the issue
148+
# would be much harder to find.
149+
channel: Broadcast[object]
150+
"""The channel in this entry."""

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ def _get_metric_senders(
325325
(
326326
self._get_data_extraction_method(category, metric),
327327
[
328-
self._registry.new_sender(request.get_channel_name())
328+
self._registry.get_or_create(
329+
Sample[Quantity], request.get_channel_name()
330+
).new_sender()
329331
for request in req_list
330332
],
331333
)
@@ -379,8 +381,7 @@ def process_msg(data: Any) -> None:
379381

380382
await asyncio.gather(
381383
*[
382-
# pylint: disable=protected-access
383-
self._registry._close_channel(r.get_channel_name())
384+
self._registry.close_and_remove(r.get_channel_name())
384385
for requests in self._req_streaming_metrics[comp_id].values()
385386
for r in requests
386387
]

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,16 @@ async def _run(self) -> None:
211211

212212
if component_ids not in self._subscriptions:
213213
self._subscriptions[component_ids] = {
214-
priority: self._channel_registry.new_sender(
215-
sub.get_channel_name()
216-
)
214+
priority: self._channel_registry.get_or_create(
215+
_Report, sub.get_channel_name()
216+
).new_sender()
217217
}
218218
elif priority not in self._subscriptions[component_ids]:
219219
self._subscriptions[component_ids][
220220
priority
221-
] = self._channel_registry.new_sender(sub.get_channel_name())
221+
] = self._channel_registry.get_or_create(
222+
_Report, sub.get_channel_name()
223+
).new_sender()
222224

223225
if sub.component_ids not in self._bound_tracker_tasks:
224226
self._add_bounds_tracker(sub.component_ids)

src/frequenz/sdk/actor/_resampling.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,15 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
7777
)
7878
data_source_channel_name = data_source_request.get_channel_name()
7979
await self._data_sourcing_request_sender.send(data_source_request)
80-
receiver = self._channel_registry.new_receiver(data_source_channel_name)
80+
receiver = self._channel_registry.get_or_create(
81+
Sample[Quantity], data_source_channel_name
82+
).new_receiver()
8183

8284
# This is a temporary hack until the Sender implementation uses
8385
# exceptions to report errors.
84-
sender = self._channel_registry.new_sender(request_channel_name)
86+
sender = self._channel_registry.get_or_create(
87+
Sample[Quantity], request_channel_name
88+
).new_sender()
8589

8690
async def sink_adapter(sample: Sample[Quantity]) -> None:
8791
await sender.send(sample)

src/frequenz/sdk/timeseries/_base_types.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
import dataclasses
77
import enum
88
import functools
9-
import typing
109
from collections.abc import Callable, Iterator
1110
from dataclasses import dataclass
1211
from datetime import datetime, timezone
13-
from typing import Generic, Self, overload
12+
from typing import Generic, Self, TypeVar, overload
1413

1514
from ._quantities import Power, QuantityT
1615

@@ -140,7 +139,7 @@ def map(
140139
)
141140

142141

143-
_T = typing.TypeVar("_T")
142+
_T = TypeVar("_T")
144143

145144

146145
@dataclass(frozen=True)

src/frequenz/sdk/timeseries/_grid_frequency.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ..microgrid import connection_manager
1616
from ..microgrid.component import Component, ComponentCategory, ComponentMetricId
1717
from ..timeseries._base_types import Sample
18-
from ..timeseries._quantities import Frequency
18+
from ..timeseries._quantities import Frequency, Quantity
1919

2020
if TYPE_CHECKING:
2121
# Imported here to avoid a circular import.
@@ -95,9 +95,9 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]:
9595
Returns:
9696
A receiver that will receive grid frequency samples.
9797
"""
98-
receiver = self._channel_registry.new_receiver(
99-
self._component_metric_request.get_channel_name()
100-
)
98+
receiver = self._channel_registry.get_or_create(
99+
Sample[Quantity], self._component_metric_request.get_channel_name()
100+
).new_receiver()
101101

102102
if not self._task:
103103
self._task = asyncio.create_task(self._send_request())

0 commit comments

Comments
 (0)