Skip to content

Commit a24e784

Browse files
Prevent stacking of power requests
The power distributing actor processes one power request at a time to prevent multiple requests for the same components from being sent to the microgrid API concurrently. Previously, this could lead to the request channel receiver becoming full if the power request frequency was higher than the processing time and even worse, the requests could be processed late in time causing unexpected behavior for applications setting power requests. This patch ensures that the actor processes one request at a time with the same components and keeps track of the latest pending request if there is an existing request with the same set of components being processed. The pending request will overwrite the previous pending one and the actor will process it once the request with the same components is done processing. Signed-off-by: Daniel Zullo <[email protected]>
1 parent d69917d commit a24e784

File tree

2 files changed

+77
-2
lines changed

2 files changed

+77
-2
lines changed

src/frequenz/sdk/actor/power_distributing/power_distributing.py

+62-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
distributing the power between the components and sends the results back to it.
1010
"""
1111

12-
12+
import asyncio
13+
import logging
1314
from datetime import timedelta
1415

1516
from frequenz.channels import Receiver, Sender
@@ -27,6 +28,8 @@
2728
from .request import Request
2829
from .result import Result
2930

31+
_logger = logging.getLogger(__name__)
32+
3033

3134
class PowerDistributingActor(Actor):
3235
# pylint: disable=too-many-instance-attributes
@@ -90,6 +93,16 @@ def __init__( # pylint: disable=too-many-arguments
9093
self._result_sender = results_sender
9194
self._api_power_request_timeout = api_power_request_timeout
9295

96+
self._processing_tasks: dict[str, asyncio.Task[None]] = {}
97+
"""Track the power request tasks currently being processed."""
98+
99+
self._pending_requests: dict[str, Request] = {}
100+
"""Track the power requests that are waiting to be processed.
101+
102+
Only one pending power request is kept for each set of components, the
103+
latest request will overwrite the previous one.
104+
"""
105+
93106
self._component_manager: ComponentManager
94107
if component_category == ComponentCategory.BATTERY:
95108
self._component_manager = BatteryManager(
@@ -128,7 +141,19 @@ async def _run(self) -> None:
128141
await self._component_manager.start()
129142

130143
async for request in self._requests_receiver:
131-
await self._component_manager.distribute_power(request)
144+
task_key = frozenset(request.component_ids)
145+
task_name = f"{type(self).__name__}:{task_key}"
146+
147+
if task_name in self._processing_tasks:
148+
if task_name in self._pending_requests:
149+
_logger.warning(
150+
"Pending request: %s, overwritten with request: %s",
151+
self._pending_requests[task_name],
152+
request,
153+
)
154+
self._pending_requests[task_name] = request
155+
else:
156+
self._process_request(task_name, request)
132157

133158
@override
134159
async def stop(self, msg: str | None = None) -> None:
@@ -139,3 +164,38 @@ async def stop(self, msg: str | None = None) -> None:
139164
"""
140165
await self._component_manager.stop()
141166
await super().stop(msg)
167+
168+
def _handle_task_completion(self, task: asyncio.Task[None]) -> None:
169+
"""Handle the completion of a power request task.
170+
171+
Args:
172+
task: The task that has completed.
173+
"""
174+
task_name = task.get_name()
175+
176+
try:
177+
task.result()
178+
except Exception: # pylint: disable=broad-except
179+
_logger.exception("Unhandled error while setting the power request")
180+
181+
if task_name in self._processing_tasks:
182+
del self._processing_tasks[task_name]
183+
else:
184+
_logger.error("Task not found in processing tasks: %s", task_name)
185+
186+
if task_name in self._pending_requests:
187+
request = self._pending_requests.pop(task_name)
188+
self._process_request(task_name, request)
189+
190+
def _process_request(self, name: str, request: Request) -> None:
191+
"""Process a power request.
192+
193+
Args:
194+
name: The name to set for the processing task.
195+
request: The power request to process.
196+
"""
197+
self._processing_tasks[name] = asyncio.create_task(
198+
self._component_manager.distribute_power(request),
199+
name=name,
200+
)
201+
self._processing_tasks[name].add_done_callback(self._handle_task_completion)

tests/timeseries/_battery_pool/test_battery_pool_control_methods.py

+15
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ async def test_case_1(
213213
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
214214
)
215215

216+
await asyncio.sleep(0.0)
217+
216218
assert set_power.call_count == 4
217219
assert sorted(set_power.call_args_list) == [
218220
mocker.call(inv_id, 250.0)
@@ -248,6 +250,7 @@ async def side_effect(inv_id: int, _: float) -> None:
248250
result, power_distributing.Success
249251
),
250252
)
253+
await asyncio.sleep(0.0)
251254
assert set_power.call_count == 4
252255
assert sorted(set_power.call_args_list) == [
253256
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -267,6 +270,7 @@ async def side_effect(inv_id: int, _: float) -> None:
267270

268271
# There should be an automatic retry.
269272
set_power.side_effect = None
273+
await asyncio.sleep(0.0)
270274
assert set_power.call_count == 4
271275
assert sorted(set_power.call_args_list) == [
272276
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -318,6 +322,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
318322
self._assert_report(
319323
await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
320324
)
325+
await asyncio.sleep(0.0)
321326
assert set_power.call_count == 2
322327
assert sorted(set_power.call_args_list) == [
323328
mocker.call(inv_id, 500.0)
@@ -331,6 +336,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
331336
if not latest_dist_result_2.has_value():
332337
bounds = await bounds_2_rx.receive()
333338
self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0)
339+
await asyncio.sleep(0.0)
334340
assert set_power.call_count == 2
335341
assert sorted(set_power.call_args_list) == [
336342
mocker.call(inv_id, 500.0)
@@ -376,6 +382,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
376382
await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0
377383
)
378384

385+
await asyncio.sleep(0.0)
379386
assert set_power.call_count == 4
380387
assert sorted(set_power.call_args_list) == [
381388
mocker.call(inv_id, -250.0)
@@ -395,6 +402,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
395402
bounds = await bounds_2_rx.receive()
396403
self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0)
397404

405+
await asyncio.sleep(0.0)
398406
assert set_power.call_count == 4
399407
assert sorted(set_power.call_args_list) == [
400408
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -428,6 +436,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
428436
self._assert_report(
429437
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
430438
)
439+
await asyncio.sleep(0.0)
431440
assert set_power.call_count == 4
432441
assert sorted(set_power.call_args_list) == [
433442
mocker.call(inv_id, 250.0)
@@ -453,6 +462,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
453462
self._assert_report(
454463
await bounds_rx.receive(), power=400.0, lower=-4000.0, upper=4000.0
455464
)
465+
await asyncio.sleep(0.0)
456466
assert set_power.call_count == 4
457467
assert sorted(set_power.call_args_list) == [
458468
mocker.call(inv_id, 100.0)
@@ -477,6 +487,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
477487
self._assert_report(
478488
await bounds_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0
479489
)
490+
await asyncio.sleep(0.0)
480491
assert set_power.call_count == 4
481492
assert sorted(set_power.call_args_list) == [
482493
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -501,6 +512,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
501512
self._assert_report(
502513
await bounds_rx.receive(), power=-400.0, lower=-4000.0, upper=4000.0
503514
)
515+
await asyncio.sleep(0.0)
504516
assert set_power.call_count == 4
505517
assert sorted(set_power.call_args_list) == [
506518
mocker.call(inv_id, -100.0)
@@ -587,6 +599,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
587599
await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0
588600
)
589601

602+
await asyncio.sleep(0.0)
590603
assert set_power.call_count == 4
591604
assert sorted(set_power.call_args_list) == [
592605
mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -624,6 +637,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
624637
if dist_result.succeeded_power == Power.from_watts(720.0):
625638
break
626639

640+
await asyncio.sleep(0.0)
627641
assert set_power.call_count == 4
628642
assert sorted(set_power.call_args_list) == [
629643
mocker.call(inv_id, 720.0 / 4)
@@ -664,6 +678,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
664678
if dist_result.succeeded_power == Power.from_watts(-280.0):
665679
break
666680

681+
await asyncio.sleep(0.0)
667682
assert set_power.call_count == 4
668683
assert sorted(set_power.call_args_list) == [
669684
mocker.call(inv_id, -280.0 / 4)

0 commit comments

Comments
 (0)