Skip to content

Improved Kubernetes orchestrator pod caching #3719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0877a4b
Improved Kubernetes orchestrator pod caching
schustmi Jun 2, 2025
30e212f
Enable for scheduled deployments
schustmi Jun 2, 2025
175e38e
Improve placeholder run detection
schustmi Jun 2, 2025
598b436
Store orchestrator run ID, improve labeling
schustmi Jun 2, 2025
b1d69b2
Index DB migration
schustmi Jun 2, 2025
a771454
Docstring
schustmi Jun 2, 2025
b780b48
Remove unnecessary index
schustmi Jun 3, 2025
53d14d4
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 3, 2025
54be72d
Order placeholder runs
schustmi Jun 4, 2025
7b3eba9
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 4, 2025
e81154d
Fix tests
schustmi Jun 4, 2025
685d26e
Merge remote-tracking branch 'origin/develop' into feature/kubernetes…
schustmi Jun 4, 2025
6f3f45c
Reuse docstring/source code from cache candidate
schustmi Jun 5, 2025
6c8791f
Use more portable sorting
schustmi Jun 5, 2025
1609a9e
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 16, 2025
237a95a
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 24, 2025
c167a69
Formatting after merge
schustmi Jun 24, 2025
233e33f
Linting
schustmi Jun 24, 2025
7dcf476
Apply suggestions from code review
schustmi Jun 25, 2025
ec5e7a3
Add run name label
schustmi Jun 25, 2025
50a1098
Fetch step runs for failed nodes in batches
schustmi Jun 25, 2025
a5c32c7
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 25, 2025
99073c5
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 26, 2025
e4b8190
Reduce wait time, compute cache after acquiring lock
schustmi Jun 26, 2025
2b4b0ec
Merge branch 'develop' into feature/kubernetes-pod-caching-v2
schustmi Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
from kubernetes import config as k8s_config

from zenml.config.base_settings import BaseSettings
from zenml.constants import (
METADATA_ORCHESTRATOR_RUN_ID,
)
from zenml.enums import StackComponentType
from zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor import (
KubernetesOrchestratorConfig,
Expand All @@ -61,6 +64,7 @@
)
from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings
from zenml.logger import get_logger
from zenml.metadata.metadata_types import MetadataType
from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult
from zenml.orchestrators.utils import get_orchestrator_run_name
from zenml.stack import StackValidator
Expand Down Expand Up @@ -467,9 +471,7 @@ def submit_pipeline(
# This will internally also build the command/args for all step pods.
command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
kubernetes_namespace=self.config.kubernetes_namespace,
run_id=placeholder_run.id if placeholder_run else None,
)

Expand Down Expand Up @@ -508,6 +510,18 @@ def submit_pipeline(
}
)

orchestrator_pod_labels = {
"pipeline": kube_utils.sanitize_label(pipeline_name),
}

if placeholder_run:
orchestrator_pod_labels["run_id"] = kube_utils.sanitize_label(
str(placeholder_run.id)
)
orchestrator_pod_labels["run_name"] = kube_utils.sanitize_label(
str(placeholder_run.name)
)

# Schedule as CRON job if CRON schedule is given.
if deployment.schedule:
if not deployment.schedule.cron_expression:
Expand All @@ -519,9 +533,7 @@ def submit_pipeline(
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
Expand All @@ -533,6 +545,7 @@ def submit_pipeline(
successful_jobs_history_limit=settings.successful_jobs_history_limit,
failed_jobs_history_limit=settings.failed_jobs_history_limit,
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
labels=orchestrator_pod_labels,
)

self._k8s_batch_api.create_namespaced_cron_job(
Expand All @@ -547,16 +560,15 @@ def submit_pipeline(
else:
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
privileged=False,
pod_settings=orchestrator_pod_settings,
service_account_name=service_account_name,
env=environment,
labels=orchestrator_pod_labels,
mount_local_stores=self.config.is_local,
)

Expand All @@ -572,6 +584,11 @@ def submit_pipeline(
startup_timeout=settings.pod_startup_timeout,
)

metadata: Dict[str, MetadataType] = {
METADATA_ORCHESTRATOR_RUN_ID: pod_name,
}

# Wait for the orchestrator pod to finish and stream logs.
if settings.synchronous:

def _wait_for_run_to_finish() -> None:
Expand All @@ -588,7 +605,8 @@ def _wait_for_run_to_finish() -> None:
)

return SubmissionResult(
wait_for_completion=_wait_for_run_to_finish
metadata=metadata,
wait_for_completion=_wait_for_run_to_finish,
)
else:
logger.info(
Expand All @@ -597,7 +615,9 @@ def _wait_for_run_to_finish() -> None:
f"Run the following command to inspect the logs: "
f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
)
return None
return SubmissionResult(
metadata=metadata,
)

def _get_service_account_name(
self, settings: KubernetesOrchestratorSettings
Expand Down Expand Up @@ -642,3 +662,18 @@ def get_orchestrator_run_id(self) -> str:
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)

def get_pipeline_run_metadata(
self, run_id: UUID
) -> Dict[str, "MetadataType"]:
"""Get general component-specific metadata for a pipeline run.

Args:
run_id: The ID of the pipeline run.

Returns:
A dictionary of metadata.
"""
return {
METADATA_ORCHESTRATOR_RUN_ID: self.get_orchestrator_run_id(),
}
Loading
Loading