diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index b4dbe18de..8280f2e35 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -43,3 +43,4 @@ - Fixed a bug that was causing the `PowerDistributor` to exit if power requests to PV inverters or EV chargers timeout. - Fix handling of cancelled tasks in the data sourcing and resampling actor. - Fix PV power distribution excluding inverters that haven't sent any data since startup. +- Prevent stacking of power requests to avoid delays in processing when the power requests frequency exceeds the processing time. diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index d418d6eeb..67b3ef7b4 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -1,17 +1,17 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Actor to distribute power between batteries. +"""Actor to distribute power between components. -When charge/discharge method is called the power should be distributed so that -the SoC in batteries stays at the same level. That way of distribution -prevents using only one battery, increasing temperature, and maximize the total -amount power to charge/discharge. +The purpose of this actor is to distribute power between components in a microgrid. -Purpose of this actor is to keep SoC level of each component at the equal level. +The actor receives power requests from the power manager, process them by +distributing the power between the components and sends the results back to it. """ +import asyncio +import logging from datetime import timedelta from frequenz.channels import Receiver, Sender @@ -29,31 +29,33 @@ from .request import Request from .result import Result +_logger = logging.getLogger(__name__) + class PowerDistributingActor(Actor): # pylint: disable=too-many-instance-attributes - """Actor to distribute the power between batteries in a microgrid. - - The purpose of this tool is to keep an equal SoC level in all batteries. - The PowerDistributingActor can have many concurrent users which at this time - need to be known at construction time. + """Actor to distribute the power between components in a microgrid. - For each user a bidirectional channel needs to be created through which - they can send and receive requests and responses. + One instance of the actor can handle only one component category and type, + which needs to be specified at actor startup and it will setup the correct + component manager based on the given category and type. - It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if - the processing function fails then the response will never come. - The timeout should be Result:request_timeout + time for processing the request. + Only one power request is processed at a time to prevent from sending + multiple requests for the same components to the microgrid API at the + same time. Edge cases: - * If there are 2 requests to be processed for the same subset of batteries, then - only the latest request will be processed. Older request will be ignored. User with - older request will get response with Result.Status.IGNORED. - - * If there are 2 requests and their subset of batteries is different but they - overlap (they have at least one common battery), then then both batteries - will be processed. However it is not expected so the proper error log will be - printed. + * If a new power request is received while a power request with the same + set of components is being processed, the new request will be added to + the pending requests. Then the pending request will be processed after the + request with the same set of components being processed is done. Only one + pending request is kept for each set of components, the latest request will + overwrite the previous one if there is any. + + * If there are 2 requests and their set of components is different but they + overlap (they have at least one common component), then both requests will + be processed concurrently. Though, the power manager will make sure this + doesn't happen as overlapping component IDs are not possible at the moment. """ def __init__( # pylint: disable=too-many-arguments @@ -67,7 +69,7 @@ def __init__( # pylint: disable=too-many-arguments component_type: ComponentType | None = None, name: str | None = None, ) -> None: - """Create class instance. + """Create actor instance. Args: requests_receiver: Receiver for receiving power requests from the power @@ -99,6 +101,16 @@ def __init__( # pylint: disable=too-many-arguments self._result_sender = results_sender self._api_power_request_timeout = api_power_request_timeout + self._processing_tasks: dict[frozenset[int], asyncio.Task[None]] = {} + """Track the power request tasks currently being processed.""" + + self._pending_requests: dict[frozenset[int], Request] = {} + """Track the power requests that are waiting to be processed. + + Only one pending power request is kept for each set of components, the + latest request will overwrite the previous one. + """ + self._component_manager: ComponentManager if component_category == ComponentCategory.BATTERY: self._component_manager = BatteryManager( @@ -121,19 +133,34 @@ def __init__( # pylint: disable=too-many-arguments ) @override - async def _run(self) -> None: # pylint: disable=too-many-locals - """Run actor main function. + async def _run(self) -> None: + """Run this actor's logic. + + It waits for new power requests and process them. Only one power request + can be processed at a time to prevent from sending multiple requests for + the same components to the microgrid API at the same time. - It waits for new requests in task_queue and process it, and send - `set_power` request with distributed power. - The output of the `set_power` method is processed. - Every battery and inverter that failed or didn't respond in time will be marked + A new power request will be ignored if a power request with the same + components is currently being processed. + + Every component that failed or didn't respond in time will be marked as broken for some time. """ await self._component_manager.start() async for request in self._requests_receiver: - await self._component_manager.distribute_power(request) + req_id = frozenset(request.component_ids) + + if req_id in self._processing_tasks: + if pending_request := self._pending_requests.get(req_id): + _logger.debug( + "Pending request: %s, overwritten with request: %s", + pending_request, + request, + ) + self._pending_requests[req_id] = request + else: + self._process_request(req_id, request) @override async def stop(self, msg: str | None = None) -> None: @@ -144,3 +171,41 @@ async def stop(self, msg: str | None = None) -> None: """ await self._component_manager.stop() await super().stop(msg) + + def _handle_task_completion( + self, req_id: frozenset[int], request: Request, task: asyncio.Task[None] + ) -> None: + """Handle the completion of a power request task. + + Args: + req_id: The id to identify the power request. + request: The power request that has been processed. + task: The task that has completed. + """ + try: + task.result() + except Exception: # pylint: disable=broad-except + _logger.exception("Failed power request: %s", request) + + if req_id in self._pending_requests: + self._process_request(req_id, self._pending_requests.pop(req_id)) + elif req_id in self._processing_tasks: + del self._processing_tasks[req_id] + else: + _logger.error("Request id not found in processing tasks: %s", req_id) + + def _process_request(self, req_id: frozenset[int], request: Request) -> None: + """Process a power request. + + Args: + req_id: The id to identify the power request. + request: The power request to process. + """ + task = asyncio.create_task( + self._component_manager.distribute_power(request), + name=f"{type(self).__name__}:{request}", + ) + task.add_done_callback( + lambda t: self._handle_task_completion(req_id, request, t) + ) + self._processing_tasks[req_id] = task diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 7a7b4bfd7..767e93819 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -213,6 +213,7 @@ async def test_case_1( await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 250.0) @@ -248,6 +249,7 @@ async def side_effect(inv_id: int, _: float) -> None: result, power_distributing.Success ), ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids @@ -267,6 +269,7 @@ async def side_effect(inv_id: int, _: float) -> None: # There should be an automatic retry. set_power.side_effect = None + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids @@ -318,6 +321,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 2 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 500.0) @@ -331,6 +335,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None: if not latest_dist_result_2.has_value(): bounds = await bounds_2_rx.receive() self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 2 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 500.0) @@ -375,7 +380,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0 ) - + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, -250.0) @@ -394,7 +399,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None: if not latest_dist_result_2.has_value(): bounds = await bounds_2_rx.receive() self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0) - + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids @@ -428,6 +433,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 250.0) @@ -453,6 +459,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_rx.receive(), power=400.0, lower=-4000.0, upper=4000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 100.0) @@ -477,6 +484,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids @@ -501,6 +509,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_rx.receive(), power=-400.0, lower=-4000.0, upper=4000.0 ) + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, -100.0) @@ -586,7 +595,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals self._assert_report( await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0 ) - + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids @@ -624,6 +633,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals if dist_result.succeeded_power == Power.from_watts(720.0): break + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 720.0 / 4) @@ -664,6 +674,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals if dist_result.succeeded_power == Power.from_watts(-280.0): break + await asyncio.sleep(0.0) # Wait for the power to be distributed. assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, -280.0 / 4)