Skip to content

Commit 3f06be6

Browse files
committed
WIP step env on containers
1 parent 9e2f78e commit 3f06be6

File tree

15 files changed

+97
-122
lines changed

15 files changed

+97
-122
lines changed

src/zenml/integrations/airflow/orchestrators/airflow_orchestrator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def prepare_or_run_pipeline(
195195
self,
196196
deployment: "PipelineDeploymentResponse",
197197
stack: "Stack",
198-
environment: Dict[str, str],
198+
environment: Dict[str, Dict[str, str]],
199199
placeholder_run: Optional["PipelineRunResponse"] = None,
200200
) -> Any:
201201
"""Creates and writes an Airflow DAG zip file.
@@ -248,7 +248,7 @@ def prepare_or_run_pipeline(
248248
docker_image=image,
249249
command=command,
250250
arguments=arguments,
251-
environment=environment,
251+
environment=environment[step_name],
252252
operator_source=settings.operator,
253253
operator_args=operator_args,
254254
)

src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ def prepare_or_run_pipeline(
266266
self,
267267
deployment: "PipelineDeploymentResponse",
268268
stack: "Stack",
269-
environment: Dict[str, str],
269+
environment: Dict[str, Dict[str, str]],
270270
placeholder_run: Optional["PipelineRunResponse"] = None,
271271
) -> Iterator[Dict[str, MetadataType]]:
272272
"""Prepares or runs a pipeline on Sagemaker.
@@ -299,23 +299,24 @@ def prepare_or_run_pipeline(
299299

300300
session = self._get_sagemaker_session()
301301

302-
# Sagemaker does not allow environment variables longer than 256
303-
# characters to be passed to Processor steps. If an environment variable
304-
# is longer than 256 characters, we split it into multiple environment
305-
# variables (chunks) and re-construct it on the other side using the
306-
# custom entrypoint configuration.
307-
split_environment_variables(
308-
size_limit=SAGEMAKER_PROCESSOR_STEP_ENV_VAR_SIZE_LIMIT,
309-
env=environment,
310-
)
311-
312-
environment[ENV_ZENML_SAGEMAKER_RUN_ID] = (
313-
ExecutionVariables.PIPELINE_EXECUTION_ARN
314-
)
315-
316302
sagemaker_steps = []
317303
for step_name, step in deployment.step_configurations.items():
318-
step_environment = environment.copy()
304+
step_environment = environment[step_name]
305+
306+
# Sagemaker does not allow environment variables longer than 256
307+
# characters to be passed to Processor steps. If an environment variable
308+
# is longer than 256 characters, we split it into multiple environment
309+
# variables (chunks) and re-construct it on the other side using the
310+
# custom entrypoint configuration.
311+
split_environment_variables(
312+
size_limit=SAGEMAKER_PROCESSOR_STEP_ENV_VAR_SIZE_LIMIT,
313+
env=step_environment,
314+
)
315+
316+
step_environment[ENV_ZENML_SAGEMAKER_RUN_ID] = (
317+
ExecutionVariables.PIPELINE_EXECUTION_ARN
318+
)
319+
319320
image = self.get_image(deployment=deployment, step_name=step_name)
320321
command = SagemakerEntrypointConfiguration.get_entrypoint_command()
321322
arguments = (

src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def prepare_or_run_pipeline(
202202
self,
203203
deployment: "PipelineDeploymentResponse",
204204
stack: "Stack",
205-
environment: Dict[str, str],
205+
environment: Dict[str, Dict[str, str]],
206206
placeholder_run: Optional["PipelineRunResponse"] = None,
207207
) -> Iterator[Dict[str, MetadataType]]:
208208
"""Prepares or runs a pipeline on AzureML.
@@ -243,6 +243,7 @@ def prepare_or_run_pipeline(
243243
# Create components
244244
components = {}
245245
for step_name, step in deployment.step_configurations.items():
246+
step_environment = environment[step_name]
246247
# Get the image for each step
247248
image = self.get_image(deployment=deployment, step_name=step_name)
248249

@@ -252,7 +253,9 @@ def prepare_or_run_pipeline(
252253
AzureMLEntrypointConfiguration.get_entrypoint_arguments(
253254
step_name=step_name,
254255
deployment_id=deployment.id,
255-
zenml_env_variables=b64_encode(json.dumps(environment)),
256+
zenml_env_variables=b64_encode(
257+
json.dumps(step_environment)
258+
),
256259
)
257260
)
258261

src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,8 @@ def dynamic_pipeline() -> None:
558558
step_settings.custom_job_parameters is not None
559559
)
560560

561+
step_environment = environment[component_name]
562+
561563
if use_custom_training_job:
562564
if not step.config.resource_settings.empty:
563565
logger.warning(
@@ -576,7 +578,7 @@ def dynamic_pipeline() -> None:
576578
component = self._convert_to_custom_training_job(
577579
component,
578580
settings=step_settings,
579-
environment=environment,
581+
environment=step_environment,
580582
)
581583
task = (
582584
component()
@@ -593,7 +595,7 @@ def dynamic_pipeline() -> None:
593595
.set_caching_options(enable_caching=False)
594596
.after(*upstream_step_components)
595597
)
596-
for key, value in environment.items():
598+
for key, value in step_environment.items():
597599
task = task.set_env_variable(name=key, value=value)
598600

599601
pod_settings = step_settings.pod_settings

src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
from zenml.orchestrators import ContainerizedOrchestrator
7373
from zenml.orchestrators.utils import get_orchestrator_run_name
7474
from zenml.stack import StackValidator
75-
from zenml.utils import io_utils, settings_utils, yaml_utils
75+
from zenml.utils import io_utils, settings_utils
7676

7777
if TYPE_CHECKING:
7878
from zenml.models import PipelineDeploymentResponse, PipelineRunResponse
@@ -470,7 +470,7 @@ def prepare_or_run_pipeline(
470470
self,
471471
deployment: "PipelineDeploymentResponse",
472472
stack: "Stack",
473-
environment: Dict[str, str],
473+
environment: Dict[str, Dict[str, str]],
474474
placeholder_run: Optional["PipelineRunResponse"] = None,
475475
) -> Any:
476476
"""Creates a kfp yaml file.
@@ -590,6 +590,7 @@ def dynamic_pipeline() -> None:
590590
component_name,
591591
component,
592592
) in step_name_to_dynamic_component.items():
593+
step_environment = environment[component_name]
593594
# for each component, check to see what other steps are
594595
# upstream of it
595596
step = deployment.step_configurations[component_name]
@@ -609,6 +610,9 @@ def dynamic_pipeline() -> None:
609610
)
610611
.after(*upstream_step_components)
611612
)
613+
for key, value in step_environment.items():
614+
task = task.set_env_variable(name=key, value=value)
615+
612616
self._configure_container_resources(
613617
task,
614618
step.config.resource_settings,
@@ -617,39 +621,6 @@ def dynamic_pipeline() -> None:
617621

618622
return dynamic_pipeline
619623

620-
def _update_yaml_with_environment(
621-
yaml_file_path: str, environment: Dict[str, str]
622-
) -> None:
623-
"""Updates the env section of the steps in the YAML file with the given environment variables.
624-
625-
Args:
626-
yaml_file_path: The path to the YAML file to update.
627-
environment: A dictionary of environment variables to add.
628-
"""
629-
pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)
630-
631-
# Iterate through each component and add the environment variables
632-
for executor in pipeline_definition["deploymentSpec"]["executors"]:
633-
if (
634-
"container"
635-
in pipeline_definition["deploymentSpec"]["executors"][
636-
executor
637-
]
638-
):
639-
container = pipeline_definition["deploymentSpec"][
640-
"executors"
641-
][executor]["container"]
642-
if "env" not in container:
643-
container["env"] = []
644-
for key, value in environment.items():
645-
container["env"].append({"name": key, "value": value})
646-
647-
yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)
648-
649-
print(
650-
f"Updated YAML file with environment variables at {yaml_file_path}"
651-
)
652-
653624
# Get a filepath to use to save the finished yaml to
654625
fileio.makedirs(self.pipeline_directory)
655626
pipeline_file_path = os.path.join(
@@ -663,9 +634,6 @@ def _update_yaml_with_environment(
663634
pipeline_name=orchestrator_run_name,
664635
)
665636

666-
# Let's update the YAML file with the environment variables
667-
_update_yaml_with_environment(pipeline_file_path, environment)
668-
669637
logger.info(
670638
"Writing Kubeflow workflow definition to `%s`.", pipeline_file_path
671639
)

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ def prepare_or_run_pipeline(
392392
self,
393393
deployment: "PipelineDeploymentResponse",
394394
stack: "Stack",
395-
environment: Dict[str, str],
395+
environment: Dict[str, Dict[str, str]],
396396
placeholder_run: Optional["PipelineRunResponse"] = None,
397397
) -> Any:
398398
"""Runs the pipeline in Kubernetes.
@@ -501,6 +501,9 @@ def prepare_or_run_pipeline(
501501
}
502502
)
503503

504+
# Use the env from any step for the orchestrator pod
505+
orchestrator_pod_env = environment.popitem()[1]
506+
504507
# Schedule as CRON job if CRON schedule is given.
505508
if deployment.schedule:
506509
if not deployment.schedule.cron_expression:
@@ -521,7 +524,7 @@ def prepare_or_run_pipeline(
521524
service_account_name=service_account_name,
522525
privileged=False,
523526
pod_settings=orchestrator_pod_settings,
524-
env=environment,
527+
env=orchestrator_pod_env,
525528
mount_local_stores=self.config.is_local,
526529
)
527530

@@ -546,7 +549,7 @@ def prepare_or_run_pipeline(
546549
privileged=False,
547550
pod_settings=orchestrator_pod_settings,
548551
service_account_name=service_account_name,
549-
env=environment,
552+
env=orchestrator_pod_env,
550553
mount_local_stores=self.config.is_local,
551554
)
552555

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
get_config_environment_vars,
4646
get_orchestrator_run_name,
4747
)
48+
from zenml.utils import env_utils
4849

4950
logger = get_logger(__name__)
5051

@@ -94,8 +95,8 @@ def main() -> None:
9495
kube_client = orchestrator.get_kube_client(incluster=True)
9596
core_api = k8s_client.CoreV1Api(kube_client)
9697

97-
env = get_config_environment_vars()
98-
env[ENV_ZENML_KUBERNETES_RUN_ID] = orchestrator_run_id
98+
shared_env = get_config_environment_vars()
99+
shared_env[ENV_ZENML_KUBERNETES_RUN_ID] = orchestrator_run_id
99100

100101
def run_step_on_kubernetes(step_name: str) -> None:
101102
"""Run a pipeline step in a separate Kubernetes pod.
@@ -112,6 +113,13 @@ def run_step_on_kubernetes(step_name: str) -> None:
112113
settings.model_dump() if settings else {}
113114
)
114115

116+
step_env = shared_env.copy()
117+
step_env.update(
118+
env_utils.get_step_environment(
119+
step_config=step_config, stack=active_stack
120+
)
121+
)
122+
115123
if settings.pod_name_prefix and not orchestrator_run_id.startswith(
116124
settings.pod_name_prefix
117125
):

src/zenml/integrations/tekton/orchestrators/tekton_orchestrator.py

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
from zenml.orchestrators import ContainerizedOrchestrator
5050
from zenml.orchestrators.utils import get_orchestrator_run_name
5151
from zenml.stack import StackValidator
52-
from zenml.utils import io_utils, yaml_utils
52+
from zenml.utils import io_utils
5353

5454
if TYPE_CHECKING:
5555
from zenml.config.base_settings import BaseSettings
@@ -459,7 +459,7 @@ def prepare_or_run_pipeline(
459459
self,
460460
deployment: "PipelineDeploymentResponse",
461461
stack: "Stack",
462-
environment: Dict[str, str],
462+
environment: Dict[str, Dict[str, str]],
463463
placeholder_run: Optional["PipelineRunResponse"] = None,
464464
) -> Any:
465465
"""Runs the pipeline on Tekton.
@@ -559,6 +559,7 @@ def dynamic_pipeline() -> None:
559559
component_name,
560560
component,
561561
) in step_name_to_dynamic_component.items():
562+
step_environment = environment[component_name]
562563
# for each component, check to see what other steps are
563564
# upstream of it
564565
step = deployment.step_configurations[component_name]
@@ -578,6 +579,8 @@ def dynamic_pipeline() -> None:
578579
)
579580
.after(*upstream_step_components)
580581
)
582+
for key, value in step_environment.items():
583+
task = task.set_env_variable(name=key, value=value)
581584
self._configure_container_resources(
582585
task,
583586
step.config.resource_settings,
@@ -586,39 +589,6 @@ def dynamic_pipeline() -> None:
586589

587590
return dynamic_pipeline
588591

589-
def _update_yaml_with_environment(
590-
yaml_file_path: str, environment: Dict[str, str]
591-
) -> None:
592-
"""Updates the env section of the steps in the YAML file with the given environment variables.
593-
594-
Args:
595-
yaml_file_path: The path to the YAML file to update.
596-
environment: A dictionary of environment variables to add.
597-
"""
598-
pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)
599-
600-
# Iterate through each component and add the environment variables
601-
for executor in pipeline_definition["deploymentSpec"]["executors"]:
602-
if (
603-
"container"
604-
in pipeline_definition["deploymentSpec"]["executors"][
605-
executor
606-
]
607-
):
608-
container = pipeline_definition["deploymentSpec"][
609-
"executors"
610-
][executor]["container"]
611-
if "env" not in container:
612-
container["env"] = []
613-
for key, value in environment.items():
614-
container["env"].append({"name": key, "value": value})
615-
616-
yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)
617-
618-
print(
619-
f"Updated YAML file with environment variables at {yaml_file_path}"
620-
)
621-
622592
# Get a filepath to use to save the finished yaml to
623593
fileio.makedirs(self.pipeline_directory)
624594
pipeline_file_path = os.path.join(
@@ -631,9 +601,6 @@ def _update_yaml_with_environment(
631601
pipeline_name=orchestrator_run_name,
632602
)
633603

634-
# Let's update the YAML file with the environment variables
635-
_update_yaml_with_environment(pipeline_file_path, environment)
636-
637604
logger.info(
638605
"Writing Tekton workflow definition to `%s`.", pipeline_file_path
639606
)

0 commit comments

Comments
 (0)