Skip to content

Commit 038b9c3

Browse files
authored
Fix potential race condition during run status update (#3720)
* Fix potential race condition during run status update * Release lock earlier when not changing status * Linting
1 parent d54120c commit 038b9c3

File tree

1 file changed

+84
-79
lines changed

1 file changed

+84
-79
lines changed

src/zenml/zen_stores/sql_zen_store.py

Lines changed: 84 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -8925,6 +8925,7 @@ def _update_pipeline_run_status(
89258925

89268926
pipeline_run = session.exec(
89278927
select(PipelineRunSchema)
8928+
.with_for_update()
89288929
.options(
89298930
joinedload(
89308931
jl_arg(PipelineRunSchema.deployment), innerjoin=True
@@ -8950,96 +8951,100 @@ def _update_pipeline_run_status(
89508951
num_steps=num_steps,
89518952
)
89528953

8953-
if pipeline_run.is_placeholder_run() and not new_status.is_finished:
8954-
# If the pipeline run is a placeholder run, no step has been started
8955-
# for the run yet. This means the orchestrator hasn't started
8954+
if new_status == pipeline_run.status or (
8955+
pipeline_run.is_placeholder_run() and not new_status.is_finished
8956+
):
8957+
# The status hasn't changed -> no need to update the status.
8958+
# If the pipeline run is a placeholder run (=no step has been started
8959+
# for the run yet), this means the orchestrator hasn't started
89568960
# running yet, and this method is most likely being called as
89578961
# part of the creation of some cached steps. In this case, we don't
89588962
# update the status unless the run is finished.
8963+
8964+
# Commit so that we release the lock on the pipeline run.
8965+
session.commit()
89598966
return
89608967

8961-
if new_status != pipeline_run.status:
8962-
run_update = PipelineRunUpdate(status=new_status)
8963-
if new_status in {
8964-
ExecutionStatus.COMPLETED,
8965-
ExecutionStatus.FAILED,
8966-
}:
8967-
run_update.end_time = utc_now()
8968-
if pipeline_run.start_time and isinstance(
8969-
pipeline_run.start_time, datetime
8970-
):
8971-
duration_time = (
8972-
run_update.end_time - pipeline_run.start_time
8973-
)
8974-
duration_seconds = duration_time.total_seconds()
8975-
start_time_str = pipeline_run.start_time.strftime(
8968+
run_update = PipelineRunUpdate(status=new_status)
8969+
if new_status.is_finished:
8970+
run_update.end_time = utc_now()
8971+
8972+
pipeline_run.update(run_update)
8973+
session.add(pipeline_run)
8974+
# Commit so that we release the lock on the pipeline run.
8975+
session.commit()
8976+
8977+
if new_status.is_finished:
8978+
assert run_update.end_time
8979+
if pipeline_run.start_time:
8980+
duration_time = run_update.end_time - pipeline_run.start_time
8981+
duration_seconds = duration_time.total_seconds()
8982+
start_time_str = pipeline_run.start_time.strftime(
8983+
"%Y-%m-%dT%H:%M:%S.%fZ"
8984+
)
8985+
else:
8986+
start_time_str = None
8987+
duration_seconds = None
8988+
8989+
stack = pipeline_run.deployment.stack
8990+
assert stack
8991+
stack_metadata = {
8992+
str(component.type): component.flavor
8993+
for component in stack.components
8994+
}
8995+
with track_handler(
8996+
AnalyticsEvent.RUN_PIPELINE_ENDED
8997+
) as analytics_handler:
8998+
analytics_handler.metadata = {
8999+
"project_id": pipeline_run.project_id,
9000+
"pipeline_run_id": pipeline_run_id,
9001+
"template_id": pipeline_run.deployment.template_id,
9002+
"status": new_status,
9003+
"num_steps": num_steps,
9004+
"start_time": start_time_str,
9005+
"end_time": run_update.end_time.strftime(
89769006
"%Y-%m-%dT%H:%M:%S.%fZ"
8977-
)
8978-
else:
8979-
start_time_str = None
8980-
duration_seconds = None
8981-
8982-
stack = pipeline_run.deployment.stack
8983-
assert stack
8984-
stack_metadata = {
8985-
str(component.type): component.flavor
8986-
for component in stack.components
9007+
),
9008+
"duration_seconds": duration_seconds,
9009+
**stack_metadata,
89879010
}
8988-
with track_handler(
8989-
AnalyticsEvent.RUN_PIPELINE_ENDED
8990-
) as analytics_handler:
8991-
analytics_handler.metadata = {
8992-
"project_id": pipeline_run.project_id,
8993-
"pipeline_run_id": pipeline_run_id,
8994-
"template_id": pipeline_run.deployment.template_id,
8995-
"status": new_status,
8996-
"num_steps": num_steps,
8997-
"start_time": start_time_str,
8998-
"end_time": run_update.end_time.strftime(
8999-
"%Y-%m-%dT%H:%M:%S.%fZ"
9000-
),
9001-
"duration_seconds": duration_seconds,
9002-
**stack_metadata,
9003-
}
90049011

9005-
completed_onboarding_steps: Set[str] = {
9006-
OnboardingStep.PIPELINE_RUN,
9007-
OnboardingStep.STARTER_SETUP_COMPLETED,
9008-
}
9009-
if stack_metadata["orchestrator"] not in {
9010-
"local",
9011-
"local_docker",
9012-
}:
9013-
completed_onboarding_steps.update(
9014-
{
9015-
OnboardingStep.PIPELINE_RUN_WITH_REMOTE_ORCHESTRATOR,
9016-
}
9012+
completed_onboarding_steps: Set[str] = {
9013+
OnboardingStep.PIPELINE_RUN,
9014+
OnboardingStep.STARTER_SETUP_COMPLETED,
9015+
}
9016+
if stack_metadata["orchestrator"] not in {
9017+
"local",
9018+
"local_docker",
9019+
}:
9020+
completed_onboarding_steps.update(
9021+
{
9022+
OnboardingStep.PIPELINE_RUN_WITH_REMOTE_ORCHESTRATOR,
9023+
}
9024+
)
9025+
if stack_metadata["artifact_store"] != "local":
9026+
completed_onboarding_steps.update(
9027+
{
9028+
OnboardingStep.PIPELINE_RUN_WITH_REMOTE_ARTIFACT_STORE,
9029+
OnboardingStep.PRODUCTION_SETUP_COMPLETED,
9030+
}
9031+
)
9032+
if OnboardingStep.THIRD_PIPELINE_RUN not in (
9033+
self._cached_onboarding_state or {}
9034+
):
9035+
onboarding_state = self.get_onboarding_state()
9036+
if OnboardingStep.PIPELINE_RUN in onboarding_state:
9037+
completed_onboarding_steps.add(
9038+
OnboardingStep.SECOND_PIPELINE_RUN
90179039
)
9018-
if stack_metadata["artifact_store"] != "local":
9019-
completed_onboarding_steps.update(
9020-
{
9021-
OnboardingStep.PIPELINE_RUN_WITH_REMOTE_ARTIFACT_STORE,
9022-
OnboardingStep.PRODUCTION_SETUP_COMPLETED,
9023-
}
9040+
if OnboardingStep.SECOND_PIPELINE_RUN in onboarding_state:
9041+
completed_onboarding_steps.add(
9042+
OnboardingStep.THIRD_PIPELINE_RUN
90249043
)
9025-
if OnboardingStep.THIRD_PIPELINE_RUN not in (
9026-
self._cached_onboarding_state or {}
9027-
):
9028-
onboarding_state = self.get_onboarding_state()
9029-
if OnboardingStep.PIPELINE_RUN in onboarding_state:
9030-
completed_onboarding_steps.add(
9031-
OnboardingStep.SECOND_PIPELINE_RUN
9032-
)
9033-
if OnboardingStep.SECOND_PIPELINE_RUN in onboarding_state:
9034-
completed_onboarding_steps.add(
9035-
OnboardingStep.THIRD_PIPELINE_RUN
9036-
)
90379044

9038-
self._update_onboarding_state(
9039-
completed_steps=completed_onboarding_steps, session=session
9040-
)
9041-
pipeline_run.update(run_update)
9042-
session.add(pipeline_run)
9045+
self._update_onboarding_state(
9046+
completed_steps=completed_onboarding_steps, session=session
9047+
)
90439048

90449049
# --------------------------- Triggers ---------------------------
90459050

0 commit comments

Comments
 (0)