Skip to content

Prevent stacking of power requests #1023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@
- Fixed a bug that was causing the `PowerDistributor` to exit if power requests to PV inverters or EV chargers timeout.
- Fix handling of cancelled tasks in the data sourcing and resampling actor.
- Fix PV power distribution excluding inverters that haven't sent any data since startup.
- Prevent stacking of power requests to avoid delays in processing when the power requests frequency exceeds the processing time.
129 changes: 97 additions & 32 deletions src/frequenz/sdk/actor/power_distributing/power_distributing.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Actor to distribute power between batteries.
"""Actor to distribute power between components.

When charge/discharge method is called the power should be distributed so that
the SoC in batteries stays at the same level. That way of distribution
prevents using only one battery, increasing temperature, and maximize the total
amount power to charge/discharge.
The purpose of this actor is to distribute power between components in a microgrid.

Purpose of this actor is to keep SoC level of each component at the equal level.
The actor receives power requests from the power manager, process them by
distributing the power between the components and sends the results back to it.
"""


import asyncio
import logging
from datetime import timedelta

from frequenz.channels import Receiver, Sender
Expand All @@ -29,31 +29,33 @@
from .request import Request
from .result import Result

_logger = logging.getLogger(__name__)


class PowerDistributingActor(Actor):
# pylint: disable=too-many-instance-attributes
"""Actor to distribute the power between batteries in a microgrid.

The purpose of this tool is to keep an equal SoC level in all batteries.
The PowerDistributingActor can have many concurrent users which at this time
need to be known at construction time.
"""Actor to distribute the power between components in a microgrid.

For each user a bidirectional channel needs to be created through which
they can send and receive requests and responses.
One instance of the actor can handle only one component category and type,
which needs to be specified at actor startup and it will setup the correct
component manager based on the given category and type.

It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if
the processing function fails then the response will never come.
The timeout should be Result:request_timeout + time for processing the request.
Only one power request is processed at a time to prevent from sending
multiple requests for the same components to the microgrid API at the
same time.

Edge cases:
* If there are 2 requests to be processed for the same subset of batteries, then
only the latest request will be processed. Older request will be ignored. User with
older request will get response with Result.Status.IGNORED.

* If there are 2 requests and their subset of batteries is different but they
overlap (they have at least one common battery), then then both batteries
will be processed. However it is not expected so the proper error log will be
printed.
* If a new power request is received while a power request with the same
set of components is being processed, the new request will be added to
the pending requests. Then the pending request will be processed after the
request with the same set of components being processed is done. Only one
pending request is kept for each set of components, the latest request will
overwrite the previous one if there is any.

* If there are 2 requests and their set of components is different but they
overlap (they have at least one common component), then both requests will
be processed concurrently. Though, the power manager will make sure this
doesn't happen as overlapping component IDs are not possible at the moment.
"""

def __init__( # pylint: disable=too-many-arguments
Expand All @@ -67,7 +69,7 @@ def __init__( # pylint: disable=too-many-arguments
component_type: ComponentType | None = None,
name: str | None = None,
) -> None:
"""Create class instance.
"""Create actor instance.

Args:
requests_receiver: Receiver for receiving power requests from the power
Expand Down Expand Up @@ -99,6 +101,16 @@ def __init__( # pylint: disable=too-many-arguments
self._result_sender = results_sender
self._api_power_request_timeout = api_power_request_timeout

self._processing_tasks: dict[frozenset[int], asyncio.Task[None]] = {}
"""Track the power request tasks currently being processed."""

self._pending_requests: dict[frozenset[int], Request] = {}
"""Track the power requests that are waiting to be processed.

Only one pending power request is kept for each set of components, the
latest request will overwrite the previous one.
"""

self._component_manager: ComponentManager
if component_category == ComponentCategory.BATTERY:
self._component_manager = BatteryManager(
Expand All @@ -121,19 +133,34 @@ def __init__( # pylint: disable=too-many-arguments
)

@override
async def _run(self) -> None: # pylint: disable=too-many-locals
"""Run actor main function.
async def _run(self) -> None:
"""Run this actor's logic.

It waits for new power requests and process them. Only one power request
can be processed at a time to prevent from sending multiple requests for
the same components to the microgrid API at the same time.

It waits for new requests in task_queue and process it, and send
`set_power` request with distributed power.
The output of the `set_power` method is processed.
Every battery and inverter that failed or didn't respond in time will be marked
A new power request will be ignored if a power request with the same
components is currently being processed.

Every component that failed or didn't respond in time will be marked
as broken for some time.
"""
await self._component_manager.start()

async for request in self._requests_receiver:
await self._component_manager.distribute_power(request)
req_id = frozenset(request.component_ids)

if req_id in self._processing_tasks:
if pending_request := self._pending_requests.get(req_id):
_logger.debug(
"Pending request: %s, overwritten with request: %s",
pending_request,
request,
)
self._pending_requests[req_id] = request
else:
self._process_request(req_id, request)

@override
async def stop(self, msg: str | None = None) -> None:
Expand All @@ -144,3 +171,41 @@ async def stop(self, msg: str | None = None) -> None:
"""
await self._component_manager.stop()
await super().stop(msg)

def _handle_task_completion(
self, req_id: frozenset[int], request: Request, task: asyncio.Task[None]
) -> None:
"""Handle the completion of a power request task.

Args:
req_id: The id to identify the power request.
request: The power request that has been processed.
task: The task that has completed.
"""
try:
task.result()
except Exception: # pylint: disable=broad-except
_logger.exception("Failed power request: %s", request)

if req_id in self._pending_requests:
self._process_request(req_id, self._pending_requests.pop(req_id))
elif req_id in self._processing_tasks:
del self._processing_tasks[req_id]
else:
_logger.error("Request id not found in processing tasks: %s", req_id)

def _process_request(self, req_id: frozenset[int], request: Request) -> None:
"""Process a power request.

Args:
req_id: The id to identify the power request.
request: The power request to process.
"""
task = asyncio.create_task(
self._component_manager.distribute_power(request),
name=f"{type(self).__name__}:{request}",
)
task.add_done_callback(
lambda t: self._handle_task_completion(req_id, request, t)
)
self._processing_tasks[req_id] = task
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async def test_case_1(
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
)

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 250.0)
Expand Down Expand Up @@ -248,6 +249,7 @@ async def side_effect(inv_id: int, _: float) -> None:
result, power_distributing.Success
),
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
Expand All @@ -267,6 +269,7 @@ async def side_effect(inv_id: int, _: float) -> None:

# There should be an automatic retry.
set_power.side_effect = None
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
Expand Down Expand Up @@ -318,6 +321,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 2
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 500.0)
Expand All @@ -331,6 +335,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
if not latest_dist_result_2.has_value():
bounds = await bounds_2_rx.receive()
self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 2
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 500.0)
Expand Down Expand Up @@ -375,7 +380,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0
)

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, -250.0)
Expand All @@ -394,7 +399,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
if not latest_dist_result_2.has_value():
bounds = await bounds_2_rx.receive()
self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0)

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
Expand Down Expand Up @@ -428,6 +433,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 250.0)
Expand All @@ -453,6 +459,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_rx.receive(), power=400.0, lower=-4000.0, upper=4000.0
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 100.0)
Expand All @@ -477,6 +484,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
Expand All @@ -501,6 +509,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
self._assert_report(
await bounds_rx.receive(), power=-400.0, lower=-4000.0, upper=4000.0
)
await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, -100.0)
Expand Down Expand Up @@ -586,7 +595,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
self._assert_report(
await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0
)

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids
Expand Down Expand Up @@ -624,6 +633,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
if dist_result.succeeded_power == Power.from_watts(720.0):
break

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, 720.0 / 4)
Expand Down Expand Up @@ -664,6 +674,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
if dist_result.succeeded_power == Power.from_watts(-280.0):
break

await asyncio.sleep(0.0) # Wait for the power to be distributed.
assert set_power.call_count == 4
assert sorted(set_power.call_args_list) == [
mocker.call(inv_id, -280.0 / 4)
Expand Down
Loading