Skip to content

Commit 0a731af

Browse files
Prevent stacking of power requests
The power distributing actor processes one power request at a time to prevent multiple requests for the same components from being sent to the microgrid API concurrently. Previously, this could lead to the request channel receiver becoming full if the power request frequency was higher than the processing time. Even worse, the requests could be processed late, causing unexpected behavior for applications setting power requests. Moreover, the actor was blocking power requests with different sets of components from being processed if there was any existing request. This patch ensures that the actor processes one request at a time for different sets of components and keeps track of the latest pending request if there is an existing request with the same set of components being processed. The pending request will be overwritten by the latest received request with the same set of components, and the actor will process it once the request with the same components is done processing. Signed-off-by: Daniel Zullo <[email protected]>
1 parent fd5d778 commit 0a731af

File tree

2 files changed

+77
-4
lines changed

2 files changed

+77
-4
lines changed

src/frequenz/sdk/actor/power_distributing/power_distributing.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
"""
1111

1212

13+
import asyncio
14+
import logging
1315
from datetime import timedelta
1416

1517
from frequenz.channels import Receiver, Sender
@@ -27,6 +29,8 @@
2729
from .request import Request
2830
from .result import Result
2931

32+
_logger = logging.getLogger(__name__)
33+
3034

3135
class PowerDistributingActor(Actor):
3236
# pylint: disable=too-many-instance-attributes
@@ -97,6 +101,16 @@ def __init__( # pylint: disable=too-many-arguments
97101
self._result_sender = results_sender
98102
self._api_power_request_timeout = api_power_request_timeout
99103

104+
self._processing_tasks: dict[str, asyncio.Task[None]] = {}
105+
"""Track the power request tasks currently being processed."""
106+
107+
self._pending_requests: dict[str, Request] = {}
108+
"""Track the power requests that are waiting to be processed.
109+
110+
Only one pending power request is kept for each set of components, the
111+
latest request will overwrite the previous one.
112+
"""
113+
100114
self._component_manager: ComponentManager
101115
if component_category == ComponentCategory.BATTERY:
102116
self._component_manager = BatteryManager(
@@ -135,7 +149,21 @@ async def _run(self) -> None:
135149
await self._component_manager.start()
136150

137151
async for request in self._requests_receiver:
138-
await self._component_manager.distribute_power(request)
152+
key = hex(hash(frozenset(request.component_ids)))
153+
name = f"{type(self).__name__}:{key}"
154+
155+
_logger.debug("%r distributing power request: %s", name, request)
156+
157+
if name in self._processing_tasks:
158+
if name in self._pending_requests:
159+
_logger.warning(
160+
"Pending request: %s, overwritten with request: %s",
161+
self._pending_requests[name],
162+
request,
163+
)
164+
self._pending_requests[name] = request
165+
else:
166+
self._process_request(name, request)
139167

140168
@override
141169
async def stop(self, msg: str | None = None) -> None:
@@ -146,3 +174,37 @@ async def stop(self, msg: str | None = None) -> None:
146174
"""
147175
await self._component_manager.stop()
148176
await super().stop(msg)
177+
178+
def _handle_task_completion(self, task: asyncio.Task[None]) -> None:
179+
"""Handle the completion of a power request task.
180+
181+
Args:
182+
task: The task that has completed.
183+
"""
184+
task_name = task.get_name()
185+
186+
try:
187+
task.result()
188+
except Exception: # pylint: disable=broad-except
189+
_logger.exception("Power distributing task failed: %s", task_name)
190+
191+
if task_name in self._pending_requests:
192+
request = self._pending_requests.pop(task_name)
193+
self._process_request(task_name, request)
194+
elif task_name in self._processing_tasks:
195+
del self._processing_tasks[task_name]
196+
else:
197+
_logger.error("Task not found in processing tasks: %s", task_name)
198+
199+
def _process_request(self, name: str, request: Request) -> None:
200+
"""Process a power request.
201+
202+
Args:
203+
name: The name to set for the processing task.
204+
request: The power request to process.
205+
"""
206+
self._processing_tasks[name] = asyncio.create_task(
207+
self._component_manager.distribute_power(request),
208+
name=name,
209+
)
210+
self._processing_tasks[name].add_done_callback(self._handle_task_completion)

tests/timeseries/_battery_pool/test_battery_pool_control_methods.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ async def test_case_1(
213213
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
214214
)
215215

216+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
216217
assert set_power.call_count == 4
217218
assert sorted(set_power.call_args_list) == [
218219
mocker.call(inv_id, 250.0)
@@ -248,6 +249,7 @@ async def side_effect(inv_id: int, _: float) -> None:
248249
result, power_distributing.Success
249250
),
250251
)
252+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
251253
assert set_power.call_count == 4
252254
assert sorted(set_power.call_args_list) == [
253255
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -267,6 +269,7 @@ async def side_effect(inv_id: int, _: float) -> None:
267269

268270
# There should be an automatic retry.
269271
set_power.side_effect = None
272+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
270273
assert set_power.call_count == 4
271274
assert sorted(set_power.call_args_list) == [
272275
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -318,6 +321,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
318321
self._assert_report(
319322
await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
320323
)
324+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
321325
assert set_power.call_count == 2
322326
assert sorted(set_power.call_args_list) == [
323327
mocker.call(inv_id, 500.0)
@@ -331,6 +335,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
331335
if not latest_dist_result_2.has_value():
332336
bounds = await bounds_2_rx.receive()
333337
self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0)
338+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
334339
assert set_power.call_count == 2
335340
assert sorted(set_power.call_args_list) == [
336341
mocker.call(inv_id, 500.0)
@@ -375,7 +380,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
375380
self._assert_report(
376381
await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0
377382
)
378-
383+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
379384
assert set_power.call_count == 4
380385
assert sorted(set_power.call_args_list) == [
381386
mocker.call(inv_id, -250.0)
@@ -394,7 +399,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
394399
if not latest_dist_result_2.has_value():
395400
bounds = await bounds_2_rx.receive()
396401
self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0)
397-
402+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
398403
assert set_power.call_count == 4
399404
assert sorted(set_power.call_args_list) == [
400405
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -428,6 +433,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
428433
self._assert_report(
429434
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
430435
)
436+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
431437
assert set_power.call_count == 4
432438
assert sorted(set_power.call_args_list) == [
433439
mocker.call(inv_id, 250.0)
@@ -453,6 +459,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
453459
self._assert_report(
454460
await bounds_rx.receive(), power=400.0, lower=-4000.0, upper=4000.0
455461
)
462+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
456463
assert set_power.call_count == 4
457464
assert sorted(set_power.call_args_list) == [
458465
mocker.call(inv_id, 100.0)
@@ -477,6 +484,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
477484
self._assert_report(
478485
await bounds_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0
479486
)
487+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
480488
assert set_power.call_count == 4
481489
assert sorted(set_power.call_args_list) == [
482490
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -501,6 +509,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
501509
self._assert_report(
502510
await bounds_rx.receive(), power=-400.0, lower=-4000.0, upper=4000.0
503511
)
512+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
504513
assert set_power.call_count == 4
505514
assert sorted(set_power.call_args_list) == [
506515
mocker.call(inv_id, -100.0)
@@ -586,7 +595,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
586595
self._assert_report(
587596
await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0
588597
)
589-
598+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
590599
assert set_power.call_count == 4
591600
assert sorted(set_power.call_args_list) == [
592601
mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -624,6 +633,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
624633
if dist_result.succeeded_power == Power.from_watts(720.0):
625634
break
626635

636+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
627637
assert set_power.call_count == 4
628638
assert sorted(set_power.call_args_list) == [
629639
mocker.call(inv_id, 720.0 / 4)
@@ -664,6 +674,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
664674
if dist_result.succeeded_power == Power.from_watts(-280.0):
665675
break
666676

677+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
667678
assert set_power.call_count == 4
668679
assert sorted(set_power.call_args_list) == [
669680
mocker.call(inv_id, -280.0 / 4)

0 commit comments

Comments
 (0)