Skip to content

Commit 74f53d2

Browse files
committed
Use aws batch request model adapter
Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent 9b04ec4 commit 74f53d2

File tree

8 files changed

+1490
-155
lines changed

8 files changed

+1490
-155
lines changed

plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy

Lines changed: 60 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,48 @@
1616

1717
package nextflow.cloud.aws.batch
1818

19-
import static AwsContainerOptionsMapper.*
19+
20+
import static nextflow.cloud.aws.batch.AwsContainerOptionsMapper.*
2021

2122
import java.nio.file.Path
2223
import java.nio.file.Paths
2324
import java.time.Instant
2425

26+
import groovy.transform.Canonical
27+
import groovy.transform.CompileStatic
28+
import groovy.transform.Memoized
29+
import groovy.util.logging.Slf4j
30+
import nextflow.BuildInfo
31+
import nextflow.SysEnv
32+
import nextflow.cloud.aws.batch.model.ContainerPropertiesModel
33+
import nextflow.cloud.aws.batch.model.RegisterJobDefinitionModel
34+
import nextflow.cloud.types.CloudMachineInfo
35+
import nextflow.container.ContainerNameValidator
36+
import nextflow.exception.ProcessException
37+
import nextflow.exception.ProcessSubmitException
38+
import nextflow.exception.ProcessUnrecoverableException
39+
import nextflow.executor.BashWrapperBuilder
40+
import nextflow.fusion.FusionAwareTask
41+
import nextflow.processor.BatchContext
42+
import nextflow.processor.BatchHandler
43+
import nextflow.processor.TaskArrayRun
44+
import nextflow.processor.TaskHandler
45+
import nextflow.processor.TaskRun
46+
import nextflow.processor.TaskStatus
47+
import nextflow.trace.TraceRecord
48+
import nextflow.util.CacheHelper
49+
import nextflow.util.MemoryUnit
50+
import nextflow.util.TestOnly
2551
import software.amazon.awssdk.services.batch.BatchClient
26-
import software.amazon.awssdk.services.batch.model.BatchException
2752
import software.amazon.awssdk.services.batch.model.ArrayProperties
2853
import software.amazon.awssdk.services.batch.model.AssignPublicIp
2954
import software.amazon.awssdk.services.batch.model.AttemptContainerDetail
55+
import software.amazon.awssdk.services.batch.model.BatchException
3056
import software.amazon.awssdk.services.batch.model.ClientException
3157
import software.amazon.awssdk.services.batch.model.ContainerOverrides
32-
import software.amazon.awssdk.services.batch.model.ContainerProperties
3358
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsRequest
3459
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsResponse
3560
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
36-
import software.amazon.awssdk.services.batch.model.DescribeJobsResponse
3761
import software.amazon.awssdk.services.batch.model.EphemeralStorage
3862
import software.amazon.awssdk.services.batch.model.EvaluateOnExit
3963
import software.amazon.awssdk.services.batch.model.Host
@@ -57,29 +81,6 @@ import software.amazon.awssdk.services.batch.model.SubmitJobRequest
5781
import software.amazon.awssdk.services.batch.model.SubmitJobResponse
5882
import software.amazon.awssdk.services.batch.model.TerminateJobRequest
5983
import software.amazon.awssdk.services.batch.model.Volume
60-
import groovy.transform.Canonical
61-
import groovy.transform.CompileStatic
62-
import groovy.transform.Memoized
63-
import groovy.util.logging.Slf4j
64-
import nextflow.BuildInfo
65-
import nextflow.SysEnv
66-
import nextflow.cloud.types.CloudMachineInfo
67-
import nextflow.container.ContainerNameValidator
68-
import nextflow.exception.ProcessException
69-
import nextflow.exception.ProcessSubmitException
70-
import nextflow.exception.ProcessUnrecoverableException
71-
import nextflow.executor.BashWrapperBuilder
72-
import nextflow.fusion.FusionAwareTask
73-
import nextflow.processor.BatchContext
74-
import nextflow.processor.BatchHandler
75-
import nextflow.processor.TaskArrayRun
76-
import nextflow.processor.TaskHandler
77-
import nextflow.processor.TaskRun
78-
import nextflow.processor.TaskStatus
79-
import nextflow.trace.TraceRecord
80-
import nextflow.util.CacheHelper
81-
import nextflow.util.MemoryUnit
82-
import nextflow.util.TestOnly
8384
/**
8485
* Implements a task handler for AWS Batch jobs
8586
*/
@@ -488,10 +489,9 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
488489

489490
@CompileStatic
490491
protected String resolveJobDefinition0(TaskRun task) {
491-
final builder = makeJobDefRequest(task)
492-
final req = builder.build()
492+
final req = makeJobDefRequest(task)
493493
final container = task.getContainer()
494-
final token = req.parameters().get('nf-token')
494+
final token = req.parameters.get('nf-token')
495495
final jobKey = "$container:$token".toString()
496496
if( jobDefinitions.containsKey(jobKey) )
497497
return jobDefinitions[jobKey]
@@ -501,12 +501,12 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
501501
return jobDefinitions[jobKey]
502502

503503
def msg
504-
def name = findJobDef(req.jobDefinitionName(), token)
504+
def name = findJobDef(req.jobDefinitionName, token)
505505
if( name ) {
506506
msg = "[AWS BATCH] Found job definition name=$name; container=$container"
507507
}
508508
else {
509-
name = createJobDef(builder)
509+
name = createJobDef(req)
510510
msg = "[AWS BATCH] Created job definition name=$name; container=$container"
511511
}
512512
// log the request
@@ -526,7 +526,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
526526
* @param image The Docker container image for which is required to create a Batch job definition
527527
* @return An instance of {@link software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest.Builder} for the specified Docker image
528528
*/
529-
protected RegisterJobDefinitionRequest.Builder makeJobDefRequest(TaskRun task) {
529+
@CompileStatic
530+
protected RegisterJobDefinitionModel makeJobDefRequest(TaskRun task) {
530531
final uniq = new ArrayList()
531532
final result = configJobDefRequest(task, uniq)
532533

@@ -552,43 +553,43 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
552553
* @return
553554
* An instance of {@link software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest} for the specified Docker image
554555
*/
555-
protected RegisterJobDefinitionRequest.Builder configJobDefRequest(TaskRun task, List hashingTokens) {
556+
@CompileStatic
557+
protected RegisterJobDefinitionModel configJobDefRequest(TaskRun task, List hashingTokens) {
556558
final image = task.getContainer()
557559
final name = normalizeJobDefinitionName(image)
558560
final opts = getAwsOptions()
559561

560-
final result = RegisterJobDefinitionRequest.builder()
562+
final result = new RegisterJobDefinitionModel()
561563
result.jobDefinitionName(name)
562564
result.type(JobDefinitionType.CONTAINER)
563565

564566
// create the container opts based on task config
565-
final builder = ContainerProperties.builder()
566567
final containerOpts = task.getConfig().getContainerOptionsMap()
567-
addCmdOptions(containerOpts, builder)
568+
final container = createContainerProperties(containerOpts)
568569

569570
// container definition
570571
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html
571572
final reqCpus = ResourceRequirement.builder().type(ResourceType.VCPU).value('1').build()
572573
final reqMem = ResourceRequirement.builder().type(ResourceType.MEMORY).value( opts.fargateMode ? '2048' : '1024').build()
573-
builder
574+
container
574575
.image(image)
575576
.command('true')
576577
// note the actual command, memory and cpus are overridden when the job is executed
577578
.resourceRequirements( reqCpus, reqMem )
578579

579580
final jobRole = opts.getJobRole()
580581
if( jobRole )
581-
builder.jobRoleArn(jobRole)
582+
container.jobRoleArn(jobRole)
582583

583584
if( opts.executionRole )
584-
builder.executionRoleArn(opts.executionRole)
585+
container.executionRoleArn(opts.executionRole)
585586

586587
final logsGroup = opts.getLogsGroup()
587588
if( logsGroup )
588-
builder.logConfiguration(getLogConfiguration(logsGroup, opts.getRegion()))
589+
container.logConfiguration(getLogConfiguration(logsGroup, opts.getRegion()))
589590

590591
if( fusionEnabled() )
591-
builder.privileged(true)
592+
container.privileged(true)
592593

593594
final mountsMap = new LinkedHashMap( 10)
594595
final awscli = opts.cliPath
@@ -604,20 +605,20 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
604605
}
605606

606607
if( mountsMap )
607-
addVolumeMountsToContainer(mountsMap, builder)
608+
addVolumeMountsToContainer(mountsMap, container)
608609

609610
// Fargate specific settings
610611
if( opts.isFargateMode() ) {
611612
result.platformCapabilities(List.of(PlatformCapability.FARGATE))
612-
builder.networkConfiguration( NetworkConfiguration.builder().assignPublicIp(AssignPublicIp.ENABLED).build() )
613+
container.networkConfiguration( NetworkConfiguration.builder().assignPublicIp(AssignPublicIp.ENABLED).build() )
613614
// use at least 50 GB as disk local storage
614615
final diskGb = task.config.getDisk()?.toGiga()?.toInteger() ?: 50
615-
builder.ephemeralStorage( EphemeralStorage.builder().sizeInGiB(diskGb).build() )
616+
container.ephemeralStorage( EphemeralStorage.builder().sizeInGiB(diskGb).build() )
616617
// check for arm64 cpu architecture
617618
if( task.config.getArchitecture()?.arch == 'arm64' )
618-
builder.runtimePlatform(RuntimePlatform.builder().cpuArchitecture('ARM64').build())
619+
container.runtimePlatform(RuntimePlatform.builder().cpuArchitecture('ARM64').build())
619620
}
620-
final container = builder.build()
621+
621622
// finally set the container options
622623
result.containerProperties(container)
623624

@@ -641,7 +642,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
641642
]).build()
642643
}
643644

644-
protected void addVolumeMountsToContainer(Map<String,String> mountsMap, ContainerProperties.Builder container) {
645+
@CompileStatic
646+
protected void addVolumeMountsToContainer(Map<String,String> mountsMap, ContainerPropertiesModel container) {
645647
final mounts = new ArrayList<MountPoint>(mountsMap.size())
646648
final volumes = new ArrayList<Volume>(mountsMap.size())
647649
for( Map.Entry<String,String> entry : mountsMap.entrySet() ) {
@@ -656,12 +658,14 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
656658
def mount = MountPoint.builder()
657659
.sourceVolume(mountName)
658660
.containerPath(hostPath)
659-
.readOnly(readOnly).build()
661+
.readOnly(readOnly)
662+
.build()
660663
mounts << mount
661664

662665
def vol = Volume.builder()
663666
.name(mountName)
664-
.host(Host.builder().sourcePath(containerPath).build()).build()
667+
.host(Host.builder().sourcePath(containerPath).build())
668+
.build()
665669
volumes << vol
666670
}
667671

@@ -698,17 +702,16 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
698702
/**
699703
* Create (aka register) a new Batch job definition
700704
*
701-
* @param req A {@link RegisterJobDefinitionRequest} representing the Batch jib definition to create
705+
* @param model A {@link RegisterJobDefinitionRequest} representing the Batch jib definition to create
702706
* @return The fully qualified Batch job definition name eg {@code my-job-definition:3}
703707
*/
704-
protected String createJobDef(RegisterJobDefinitionRequest.Builder builder) {
708+
protected String createJobDef(RegisterJobDefinitionModel model) {
705709
// add nextflow tags
706-
builder.tags([
707-
'nextflow.io/createdAt': Instant.now().toString(),
708-
'nextflow.io/version': BuildInfo.version
709-
])
710+
model.addTagsEntry('nextflow.io/createdAt', Instant.now().toString())
711+
model.addTagsEntry('nextflow.io/version', BuildInfo.version)
710712
// create the job def
711-
final res = createJobDef0(bypassProxy(client), builder.build() as RegisterJobDefinitionRequest) // bypass the client proxy! see #1024
713+
final req = model.toBatchRequest()
714+
final res = createJobDef0(bypassProxy(client), req) // bypass the client proxy! see #1024
712715
return "${res.jobDefinitionName()}:${res.revision()}"
713716
}
714717

plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsContainerOptionsMapper.groovy

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package nextflow.cloud.aws.batch
1717

18-
import software.amazon.awssdk.services.batch.model.ContainerProperties
18+
import nextflow.cloud.aws.batch.model.ContainerPropertiesModel
1919
import software.amazon.awssdk.services.batch.model.KeyValuePair
2020
import software.amazon.awssdk.services.batch.model.LinuxParameters
2121
import software.amazon.awssdk.services.batch.model.Tmpfs
@@ -36,35 +36,31 @@ import nextflow.util.MemoryUnit
3636
class AwsContainerOptionsMapper {
3737

3838
@Deprecated
39-
static ContainerProperties createContainerOpts(CmdLineOptionMap options) {
39+
static ContainerPropertiesModel createContainerOpts(CmdLineOptionMap options) {
4040
return createContainerProperties(options)
4141
}
4242

43-
static ContainerProperties createContainerProperties(CmdLineOptionMap options) {
44-
final builder = ContainerProperties.builder()
45-
addCmdOptions(options, builder)
46-
return builder.build()
47-
}
48-
49-
static void addCmdOptions(CmdLineOptionMap options, ContainerProperties.Builder builder){
43+
static ContainerPropertiesModel createContainerProperties(CmdLineOptionMap options) {
44+
final containerProperties = new ContainerPropertiesModel()
5045
if ( options?.hasOptions() ) {
51-
checkPrivileged(options, builder)
52-
checkEnvVars(options, builder)
53-
checkUser(options, builder)
54-
checkReadOnly(options, builder)
55-
checkUlimit(options, builder)
46+
checkPrivileged(options, containerProperties)
47+
checkEnvVars(options, containerProperties)
48+
checkUser(options, containerProperties)
49+
checkReadOnly(options, containerProperties)
50+
checkUlimit(options, containerProperties)
5651
LinuxParameters params = checkLinuxParameters(options)
5752
if ( params != null )
58-
builder.linuxParameters(params)
53+
containerProperties.linuxParameters(params)
5954
}
55+
return containerProperties
6056
}
6157

62-
protected static void checkPrivileged(CmdLineOptionMap options, ContainerProperties.Builder containerProperties) {
58+
protected static void checkPrivileged(CmdLineOptionMap options, ContainerPropertiesModel containerProperties) {
6359
if ( findOptionWithBooleanValue(options, 'privileged') )
64-
containerProperties.privileged(true);
60+
containerProperties.privileged(true)
6561
}
6662

67-
protected static void checkEnvVars(CmdLineOptionMap options, ContainerProperties.Builder containerProperties) {
63+
protected static void checkEnvVars(CmdLineOptionMap options, ContainerPropertiesModel containerProperties) {
6864
final keyValuePairs = new ArrayList<KeyValuePair>()
6965
List<String> values = findOptionWithMultipleValues(options, 'env')
7066
values.addAll(findOptionWithMultipleValues(options, 'e'))
@@ -76,20 +72,20 @@ class AwsContainerOptionsMapper {
7672
containerProperties.environment(keyValuePairs)
7773
}
7874

79-
protected static void checkUser(CmdLineOptionMap options, ContainerProperties.Builder containerProperties) {
75+
protected static void checkUser(CmdLineOptionMap options, ContainerPropertiesModel containerProperties) {
8076
String user = findOptionWithSingleValue(options, 'u')
8177
if ( !user)
8278
user = findOptionWithSingleValue(options, 'user')
8379
if ( user )
8480
containerProperties.user(user)
8581
}
8682

87-
protected static void checkReadOnly(CmdLineOptionMap options, ContainerProperties.Builder containerProperties) {
83+
protected static void checkReadOnly(CmdLineOptionMap options, ContainerPropertiesModel containerProperties) {
8884
if ( findOptionWithBooleanValue(options, 'read-only') )
8985
containerProperties.readonlyRootFilesystem(true);
9086
}
9187

92-
protected static void checkUlimit(CmdLineOptionMap options, ContainerProperties.Builder containerProperties) {
88+
protected static void checkUlimit(CmdLineOptionMap options, ContainerPropertiesModel containerProperties) {
9389
final ulimits = new ArrayList<Ulimit>()
9490
findOptionWithMultipleValues(options, 'ulimit').each { value ->
9591
final tokens = value.tokenize('=')

0 commit comments

Comments
 (0)