Skip to content

Commit e116143

Browse files
authored
MRG: Merge pull request #673 from octue/improve-event-filtering
Improve event filtering
2 parents f0c5eaf + a40c12a commit e116143

19 files changed

+442
-160
lines changed

.devcontainer/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV HISTFILE="/workspaces/octue-sdk-python/.devcontainer/.zsh_history"
1616
USER vscode
1717
ENV POETRY_HOME=/home/vscode/.poetry
1818
RUN curl -sSL https://install.python-poetry.org | python3 -
19-
ENV PATH "$POETRY_HOME/bin:$PATH"
19+
ENV PATH="$POETRY_HOME/bin:$PATH"
2020
RUN poetry config virtualenvs.create false

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ RUN apt-get update && apt-get install -y git curl
44

55
ENV POETRY_HOME=/etc/poetry
66
RUN curl -sSL https://install.python-poetry.org | python3 -
7-
ENV PATH "$POETRY_HOME/bin:$PATH"
7+
ENV PATH="$POETRY_HOME/bin:$PATH"
88
RUN poetry config virtualenvs.create false
99

1010
# Install python dependencies. Note that poetry installs any root packages by default, but this is not available at this

docs/source/inter_service_compatibility.rst

Lines changed: 87 additions & 85 deletions
Large diffs are not rendered by default.

octue/cloud/deployment/google/cloud_run/Dockerfile-python310

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.10-slim
22

33
# Ensure print statements and log messages appear promptly in Cloud Logging.
4-
ENV PYTHONUNBUFFERED True
4+
ENV PYTHONUNBUFFERED=True
55

66
ENV PROJECT_ROOT=/workspace
77
WORKDIR $PROJECT_ROOT
@@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm
1010

1111
# Install poetry.
1212
ENV POETRY_HOME=/root/.poetry
13-
ENV PATH "$POETRY_HOME/bin:$PATH"
13+
ENV PATH="$POETRY_HOME/bin:$PATH"
1414
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;
1515

1616
# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and

octue/cloud/deployment/google/cloud_run/Dockerfile-python311

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM windpioneers/gdal-python:modest-heron-gdal-2.4.1-python-3.11-slim
22

33
# Ensure print statements and log messages appear promptly in Cloud Logging.
4-
ENV PYTHONUNBUFFERED True
4+
ENV PYTHONUNBUFFERED=True
55

66
ENV PROJECT_ROOT=/workspace
77
WORKDIR $PROJECT_ROOT
@@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm
1010

1111
# Install poetry.
1212
ENV POETRY_HOME=/root/.poetry
13-
ENV PATH "$POETRY_HOME/bin:$PATH"
13+
ENV PATH="$POETRY_HOME/bin:$PATH"
1414
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;
1515

1616
# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and

octue/cloud/deployment/google/cloud_run/Dockerfile-python39

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.9-slim
22

33
# Ensure print statements and log messages appear promptly in Cloud Logging.
4-
ENV PYTHONUNBUFFERED True
4+
ENV PYTHONUNBUFFERED=True
55

66
ENV PROJECT_ROOT=/workspace
77
WORKDIR $PROJECT_ROOT
@@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm
1010

1111
# Install poetry.
1212
ENV POETRY_HOME=/root/.poetry
13-
ENV PATH "$POETRY_HOME/bin:$PATH"
13+
ENV PATH="$POETRY_HOME/bin:$PATH"
1414
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;
1515

1616
# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and

octue/cloud/deployment/google/cloud_run/flask_app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def _check_if_should_drop_question(question_uuid, retry_count):
8181
previous_question_attempts = get_events(
8282
table_id=service_configuration.event_store_table_id,
8383
question_uuid=question_uuid,
84-
kind="delivery_acknowledgement",
84+
exclude_kinds=["question"],
8585
)
8686

8787
except google.api_core.exceptions.NotFound:

octue/cloud/events/handler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class AbstractEventHandler:
3939
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
4040
:param dict schema: the JSON schema to validate events against
4141
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
42+
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
4243
:param bool only_handle_result: if `True`, skip handling non-result events and only handle the "result" event when received (turning this on speeds up event handling)
4344
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
4445
:return None:
@@ -51,13 +52,15 @@ def __init__(
5152
event_handlers=None,
5253
schema=SERVICE_COMMUNICATION_SCHEMA,
5354
include_service_metadata_in_logs=True,
55+
exclude_logs_containing=None,
5456
only_handle_result=False,
5557
validate_events=True,
5658
):
5759
self.handle_monitor_message = handle_monitor_message
5860
self.record_events = record_events
5961
self.schema = schema
6062
self.include_service_metadata_in_logs = include_service_metadata_in_logs
63+
self.exclude_logs_containing = exclude_logs_containing
6164
self.only_handle_result = only_handle_result
6265
self.validate_events = validate_events
6366

@@ -201,6 +204,9 @@ def _handle_log_message(self, event, attributes):
201204
:param dict attributes: the event's attributes
202205
:return None:
203206
"""
207+
if self.exclude_logs_containing and self.exclude_logs_containing in event["log_record"]["msg"]:
208+
return
209+
204210
record = logging.makeLogRecord(event["log_record"])
205211

206212
# Split the log message into its parts.

octue/cloud/events/replayer.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@
88

99

1010
class EventReplayer(AbstractEventHandler):
11-
"""A replayer for events retrieved asynchronously from storage.
11+
"""A replayer for events retrieved asynchronously from storage. Note that events aren't validated by default as the
12+
main use case for this class is to replay already-validated events from the event store.
1213
1314
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
1415
:param bool record_events: if `True`, record received events in the `received_events` attribute
1516
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
1617
:param dict|str schema: the JSON schema to validate events against
1718
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
19+
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
1820
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
19-
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
21+
:param bool validate_events: if `True`, validate events before attempting to handle them (this is off by default to speed up event handling)
2022
:return None:
2123
"""
2224

@@ -27,8 +29,9 @@ def __init__(
2729
event_handlers=None,
2830
schema=SERVICE_COMMUNICATION_SCHEMA,
2931
include_service_metadata_in_logs=True,
32+
exclude_logs_containing=None,
3033
only_handle_result=False,
31-
validate_events=True,
34+
validate_events=False,
3235
):
3336
event_handlers = event_handlers or {
3437
"question": self._handle_question,
@@ -46,6 +49,7 @@ def __init__(
4649
event_handlers=event_handlers,
4750
schema=schema,
4851
include_service_metadata_in_logs=include_service_metadata_in_logs,
52+
exclude_logs_containing=exclude_logs_containing,
4953
only_handle_result=only_handle_result,
5054
validate_events=validate_events,
5155
)

octue/cloud/pub_sub/bigquery.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ def get_events(
2828
question_uuid=None,
2929
parent_question_uuid=None,
3030
originator_question_uuid=None,
31-
kind=None,
31+
kinds=None,
32+
exclude_kinds=None,
3233
include_backend_metadata=False,
3334
tail=True,
3435
limit=1000,
@@ -47,13 +48,14 @@ def get_events(
4748
:param str|None question_uuid: the UUID of a question to get events for
4849
:param str|None parent_question_uuid: the UUID of a parent question to get the sub-question events for
4950
:param str|None originator_question_uuid: the UUID of an originator question get the full tree of events for
50-
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
51+
:param iter(str)|None kinds: the kinds of event to get; if `None`, all event kinds are returned
52+
:param iter(str)|None exclude_kinds: the kind of events to exclude; if `None`, all event kinds are returned
5153
:param bool include_backend_metadata: if `True`, include the service backend metadata
5254
:param bool tail: if `True`, return the most recent events (where a limit applies); e.g. return the most recent 100 log records
5355
:param int limit: the maximum number of events to return
5456
:return list(dict): the events for the question; this will be empty if there are no events for the question
5557
"""
56-
_validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kind)
58+
_validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kinds, exclude_kinds)
5759

5860
if question_uuid:
5961
question_uuid_condition = "WHERE question_uuid=@relevant_question_uuid"
@@ -62,10 +64,17 @@ def get_events(
6264
elif originator_question_uuid:
6365
question_uuid_condition = "WHERE originator_question_uuid=@relevant_question_uuid"
6466

65-
if kind:
66-
event_kind_condition = [f"AND kind={kind!r}"]
67+
if kinds:
68+
kinds_string = ", ".join([f"{kind!r}" for kind in kinds])
69+
event_kinds_condition = [f"AND kind IN ({kinds_string})"]
6770
else:
68-
event_kind_condition = []
71+
event_kinds_condition = []
72+
73+
if exclude_kinds:
74+
exclude_kinds_string = ", ".join([f"{kind!r}" for kind in exclude_kinds])
75+
exclude_kinds_condition = [f"AND kind NOT IN ({exclude_kinds_string})"]
76+
else:
77+
exclude_kinds_condition = []
6978

7079
# Make a shallow copy of the fields to query.
7180
fields = list(DEFAULT_FIELDS)
@@ -77,7 +86,8 @@ def get_events(
7786
[
7887
f"SELECT {', '.join(fields)} FROM `{table_id}`",
7988
question_uuid_condition,
80-
*event_kind_condition,
89+
*event_kinds_condition,
90+
*exclude_kinds_condition,
8191
]
8292
)
8393

@@ -116,14 +126,15 @@ def get_events(
116126
return _unflatten_events(events)
117127

118128

119-
def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kind):
129+
def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kinds, exclude_kinds):
120130
"""Check that only one of `question_uuid`, `parent_question_uuid`, or `originator_question_uuid` are provided and
121131
that the `kind` parameter is a valid event kind.
122132
123133
:param str|None question_uuid: the UUID of a question to get events for
124134
:param str|None parent_question_uuid: the UUID of a parent question to get the sub-question events for
125135
:param str|None originator_question_uuid: the UUID of an originator question get the full tree of events for
126-
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
136+
:param iter(str)|None kinds: the kinds of event to get; if `None`, all event kinds are returned
137+
:param iter(str)|None exclude_kinds: the kind of events to exclude; if `None`, all event kinds are returned
127138
:raise ValueError: if more than one of `question_uuid`, `parent_question_uuid`, or `originator_question_uuid` are provided or the `kind` parameter is invalid
128139
:return None:
129140
"""
@@ -135,8 +146,25 @@ def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uu
135146
"provided."
136147
)
137148

138-
if kind and kind not in VALID_EVENT_KINDS:
139-
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")
149+
kinds_inputs = (bool(kinds), bool(exclude_kinds))
150+
151+
if sum(kinds_inputs) > 1:
152+
raise ValueError(
153+
f"Only one of `kinds` and `exclude_kinds` can be provided at once; received kinds={kinds!r} and "
154+
f"exclude_kinds={exclude_kinds!r}."
155+
)
156+
157+
if kinds:
158+
for kind in kinds:
159+
if kind not in VALID_EVENT_KINDS:
160+
raise ValueError(f"All items in `kinds` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")
161+
162+
if exclude_kinds:
163+
for kind in exclude_kinds:
164+
if kind not in VALID_EVENT_KINDS:
165+
raise ValueError(
166+
f"All items in `exclude_kinds` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}."
167+
)
140168

141169

142170
def _deserialise_event(event):

octue/cloud/pub_sub/events.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class GoogleCloudPubSubEventHandler(AbstractEventHandler):
7070
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
7171
:param dict|str schema: the JSON schema to validate events against
7272
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
73+
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
74+
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
75+
:param bool validate_events: if `True`, validate events before attempting to handle them (turn this off to speed up event handling at risk of failure if an invalid event is received)
7376
:return None:
7477
"""
7578

@@ -81,6 +84,9 @@ def __init__(
8184
event_handlers=None,
8285
schema=SERVICE_COMMUNICATION_SCHEMA,
8386
include_service_metadata_in_logs=True,
87+
exclude_logs_containing=None,
88+
only_handle_result=False,
89+
validate_events=True,
8490
):
8591
self.subscription = subscription
8692

@@ -90,6 +96,9 @@ def __init__(
9096
event_handlers=event_handlers,
9197
schema=schema,
9298
include_service_metadata_in_logs=include_service_metadata_in_logs,
99+
exclude_logs_containing=exclude_logs_containing,
100+
only_handle_result=only_handle_result,
101+
validate_events=validate_events,
93102
)
94103

95104
self._heartbeat_checker = None

0 commit comments

Comments
 (0)