@@ -309,8 +309,13 @@ def prepare_or_run_pipeline(
309
309
env = environment ,
310
310
)
311
311
312
+ environment [ENV_ZENML_SAGEMAKER_RUN_ID ] = (
313
+ ExecutionVariables .PIPELINE_EXECUTION_ARN
314
+ )
315
+
312
316
sagemaker_steps = []
313
317
for step_name , step in deployment .step_configurations .items ():
318
+ step_environment = environment .copy ()
314
319
image = self .get_image (deployment = deployment , step_name = step_name )
315
320
command = SagemakerEntrypointConfiguration .get_entrypoint_command ()
316
321
arguments = (
@@ -324,22 +329,18 @@ def prepare_or_run_pipeline(
324
329
SagemakerOrchestratorSettings , self .get_settings (step )
325
330
)
326
331
327
- environment [ENV_ZENML_SAGEMAKER_RUN_ID ] = (
328
- ExecutionVariables .PIPELINE_EXECUTION_ARN
329
- )
330
-
331
332
if step_settings .environment :
332
- step_environment = step_settings .environment .copy ()
333
+ user_defined_environment = step_settings .environment .copy ()
333
334
# Sagemaker does not allow environment variables longer than 256
334
335
# characters to be passed to Processor steps. If an environment variable
335
336
# is longer than 256 characters, we split it into multiple environment
336
337
# variables (chunks) and re-construct it on the other side using the
337
338
# custom entrypoint configuration.
338
339
split_environment_variables (
339
340
size_limit = SAGEMAKER_PROCESSOR_STEP_ENV_VAR_SIZE_LIMIT ,
340
- env = step_environment ,
341
+ env = user_defined_environment ,
341
342
)
342
- environment .update (step_environment )
343
+ step_environment .update (user_defined_environment )
343
344
344
345
use_training_step = (
345
346
step_settings .use_training_step
@@ -476,19 +477,19 @@ def prepare_or_run_pipeline(
476
477
)
477
478
478
479
# Convert environment to a dict of strings
479
- environment = {
480
+ step_environment = {
480
481
key : str (value )
481
482
if not isinstance (value , ExecutionVariable )
482
483
else value
483
- for key , value in environment .items ()
484
+ for key , value in step_environment .items ()
484
485
}
485
486
486
487
if use_training_step :
487
488
# Create Estimator and TrainingStep
488
489
estimator = sagemaker .estimator .Estimator (
489
490
keep_alive_period_in_seconds = step_settings .keep_alive_period_in_seconds ,
490
491
output_path = output_path ,
491
- environment = environment ,
492
+ environment = step_environment ,
492
493
container_entry_point = entrypoint ,
493
494
** args_for_step_executor ,
494
495
)
@@ -502,7 +503,7 @@ def prepare_or_run_pipeline(
502
503
# Create Processor and ProcessingStep
503
504
processor = sagemaker .processing .Processor (
504
505
entrypoint = entrypoint ,
505
- env = environment ,
506
+ env = step_environment ,
506
507
** args_for_step_executor ,
507
508
)
508
509
0 commit comments