From 92c58a6df861823f52239d55d87b2bfd861792e4 Mon Sep 17 00:00:00 2001 From: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> Date: Wed, 4 Jun 2025 11:25:59 +0100 Subject: [PATCH 1/3] feat: Azure Batch eagerly terminates jobs after all tasks have been submitted Azure Batch "job leak" is still an issue. This commit fixes #5839 which allows Nextflow to set jobs to auto terminate when all tasks have been submitted. This means that eventually jobs will move into terminated state even if something prevents nextflow reaching a graceful shutdown. Very early implementation and needs some refinement. Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> --- .../azure/batch/AzBatchProcessObserver.groovy | 77 ++++++++++ .../AzBatchProcessObserverFactory.groovy | 45 ++++++ .../cloud/azure/batch/AzBatchService.groovy | 32 ++++ .../batch/AzBatchProcessObserverTest.groovy | 141 ++++++++++++++++++ 4 files changed, 295 insertions(+) create mode 100644 plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy create mode 100644 plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy create mode 100644 plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy new file mode 100644 index 0000000000..16bad8b830 --- /dev/null +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy @@ -0,0 +1,77 @@ +/* + * Copyright 2021, Microsoft Corp + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.processor.TaskProcessor +import nextflow.trace.TraceObserver + +/** + * Observer that handles process termination events for Azure Batch executor. + * When a process terminates (all tasks have been submitted), this observer + * will eagerly set the corresponding Azure Batch job to auto-terminate when + * all tasks complete. + * + * @author Adam Talbot + */ +@Slf4j +@CompileStatic +class AzBatchProcessObserver implements TraceObserver { + + private Session session + + AzBatchProcessObserver(Session session) { + this.session = session + } + + /** + * Called when a process terminates (all tasks have been submitted). + * This method will find the Azure Batch job ID associated with the process + * and set it to auto-terminate when all tasks complete. + */ + @Override + void onProcessTerminate(TaskProcessor processor) { + // Check if this process uses the Azure Batch executor + if( !(processor.executor instanceof AzBatchExecutor) ) { + return + } + + final executor = processor.executor as AzBatchExecutor + final batchService = executor.getBatchService() + if( !batchService?.config?.batch()?.terminateJobsOnCompletion ) { + log.trace "Azure Batch job auto-termination is disabled, skipping eager termination for process: ${processor.name}" + return + } + + try { + // Find all job IDs associated with this processor + final jobIds = batchService.allJobIds.findAll { key, jobId -> + key.processor == processor + }.values() + + for( String jobId : jobIds ) { + log.debug "Setting Azure Batch job ${jobId} to auto-terminate for completed process: ${processor.name}" + batchService.setJobAutoTermination(jobId) + } + } + catch( Exception e ) { + log.warn "Failed to set auto-termination for Azure Batch jobs associated with process '${processor.name}' - Reason: ${e.message ?: e}" + } + } +} \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy new file mode 100644 index 0000000000..1715a9afbc --- /dev/null +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2021, Microsoft Corp + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import groovy.transform.CompileStatic +import nextflow.Session +import nextflow.trace.TraceObserver +import nextflow.trace.TraceObserverFactory + +/** + * Factory for creating the Azure Batch process observer that enables eager termination + * of Azure Batch jobs when processes complete. + * + * @author Adam Talbot + */ +@CompileStatic +class AzBatchProcessObserverFactory implements TraceObserverFactory { + + @Override + Collection create(Session session) { + final result = new ArrayList() + + // Only create the observer if using Azure Batch executor + final executorName = session.config.navigate('process.executor') + if( executorName == 'azurebatch' ) { + result.add(new AzBatchProcessObserver(session)) + } + + return result + } +} \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy index 5d70466656..55df30bebe 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy @@ -976,6 +976,38 @@ class AzBatchService implements Closeable { apply(() -> client.deleteTask(key.jobId, key.taskId)) } + /** + * Set a specific Azure Batch job to terminate when all tasks complete. + * This is called eagerly when a Nextflow process completes (all tasks submitted) + * rather than waiting for the entire pipeline to finish. + * + * @param jobId The Azure Batch job ID to set for auto-termination + */ + void setJobAutoTermination(String jobId) { + try { + log.trace "Setting Azure job ${jobId} to terminate on completion" + + final job = apply(() -> client.getJob(jobId)) + final poolInfo = job.poolInfo + + final jobParameter = new BatchJobUpdateContent() + .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB) + .setPoolInfo(poolInfo) + + apply(() -> client.updateJob(jobId, jobParameter)) + } + catch (HttpResponseException e) { + if (e.response.statusCode == 409) { + log.debug "Azure Batch job ${jobId} already terminated, skipping auto-termination setup" + } else { + log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - Status: ${e.response.statusCode}, Reason: ${e.message ?: e}" + } + } + catch (Exception e) { + log.warn "Unable to set auto-termination for Azure Batch job ${jobId} - Reason: ${e.message ?: e}" + } + } + /** * Set all jobs to terminate on completion. */ diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy new file mode 100644 index 0000000000..767c0b3d97 --- /dev/null +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy @@ -0,0 +1,141 @@ +/* + * Copyright 2021, Microsoft Corp + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cloud.azure.batch + +import nextflow.Session +import nextflow.cloud.azure.config.AzBatchOpts +import nextflow.cloud.azure.config.AzConfig +import nextflow.executor.Executor +import nextflow.processor.TaskProcessor +import spock.lang.Specification + +/** + * Test for AzBatchProcessObserver + * + * @author Adam Talbot + */ +class AzBatchProcessObserverTest extends Specification { + + def 'should skip non-azure batch executors'() { + given: + def session = Mock(Session) + def observer = new AzBatchProcessObserver(session) + def executor = Mock(Executor) // Not an AzBatchExecutor + def processor = Mock(TaskProcessor) { + getExecutor() >> executor + } + + when: + observer.onProcessTerminate(processor) + + then: + // No exception should be thrown and method should return early + noExceptionThrown() + } + + def 'should skip when termination is disabled'() { + given: + def session = Mock(Session) + def observer = new AzBatchProcessObserver(session) + def config = Mock(AzConfig) + def batchOpts = Mock(AzBatchOpts) { + terminateJobsOnCompletion >> false + } + def batchService = Mock(AzBatchService) { + getConfig() >> config + } + def executor = Mock(AzBatchExecutor) { + getBatchService() >> batchService + } + def processor = Mock(TaskProcessor) { + getExecutor() >> executor + getName() >> 'test-process' + } + config.batch() >> batchOpts + + when: + observer.onProcessTerminate(processor) + + then: + 0 * batchService.setJobAutoTermination(_) + } + + def 'should set job auto-termination when enabled'() { + given: + def session = Mock(Session) + def observer = new AzBatchProcessObserver(session) + def config = Mock(AzConfig) + def batchOpts = Mock(AzBatchOpts) { + terminateJobsOnCompletion >> true + } + def processor = Mock(TaskProcessor) { + getName() >> 'test-process' + } + def jobKey1 = new AzJobKey(Mock(TaskProcessor), 'pool1') + def jobKey2 = new AzJobKey(processor, 'pool1') // This one should match + def jobKey3 = new AzJobKey(Mock(TaskProcessor), 'pool2') + def allJobIds = [ + (jobKey1): 'job1', + (jobKey2): 'job2', // This should be processed + (jobKey3): 'job3' + ] + def batchService = Mock(AzBatchService) { + getConfig() >> config + getAllJobIds() >> allJobIds + } + def executor = Mock(AzBatchExecutor) { + getBatchService() >> batchService + } + processor.getExecutor() >> executor + config.batch() >> batchOpts + + when: + observer.onProcessTerminate(processor) + + then: + 1 * batchService.setJobAutoTermination('job2') + } + + def 'should handle exceptions gracefully'() { + given: + def session = Mock(Session) + def observer = new AzBatchProcessObserver(session) + def config = Mock(AzConfig) + def batchOpts = Mock(AzBatchOpts) { + terminateJobsOnCompletion >> true + } + def batchService = Mock(AzBatchService) { + getConfig() >> config + getAllJobIds() >> { throw new RuntimeException('Test error') } + } + def executor = Mock(AzBatchExecutor) { + getBatchService() >> batchService + } + def processor = Mock(TaskProcessor) { + getExecutor() >> executor + getName() >> 'test-process' + } + config.batch() >> batchOpts + + when: + observer.onProcessTerminate(processor) + + then: + // Should not throw exception + noExceptionThrown() + } +} \ No newline at end of file From 3cb359853b5b046ee15bdbc235d0f41f5ec0d579 Mon Sep 17 00:00:00 2001 From: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> Date: Wed, 4 Jun 2025 11:29:23 +0100 Subject: [PATCH 2/3] Correct copywright notice Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> --- .../nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy | 2 +- .../cloud/azure/batch/AzBatchProcessObserverFactory.groovy | 2 +- .../cloud/azure/batch/AzBatchProcessObserverTest.groovy | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy index 16bad8b830..635684b62a 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy @@ -1,5 +1,5 @@ /* - * Copyright 2021, Microsoft Corp + * Copyright 2025, Seqera * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy index 1715a9afbc..29eace8a34 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy @@ -1,5 +1,5 @@ /* - * Copyright 2021, Microsoft Corp + * Copyright 2025, Seqera * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy index 767c0b3d97..307f6e7586 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy @@ -1,5 +1,5 @@ /* - * Copyright 2021, Microsoft Corp + * Copyright 2025, Seqera * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 07c8a7050c5ca1423540bfafffdef5bc8ecc57d2 Mon Sep 17 00:00:00 2001 From: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> Date: Wed, 4 Jun 2025 11:33:23 +0100 Subject: [PATCH 3/3] Cody simplification and tidy Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> --- .../azure/batch/AzBatchProcessObserver.groovy | 31 +++--- .../AzBatchProcessObserverFactory.groovy | 10 +- .../cloud/azure/batch/AzBatchService.groovy | 29 ++--- .../src/resources/META-INF/extensions.idx | 1 + .../batch/AzBatchProcessObserverTest.groovy | 100 +++++------------- 5 files changed, 50 insertions(+), 121 deletions(-) diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy index 635684b62a..4d4cf01973 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserver.groovy @@ -34,16 +34,13 @@ import nextflow.trace.TraceObserver @CompileStatic class AzBatchProcessObserver implements TraceObserver { - private Session session - AzBatchProcessObserver(Session session) { - this.session = session + // Session not needed, but required by factory interface } /** * Called when a process terminates (all tasks have been submitted). - * This method will find the Azure Batch job ID associated with the process - * and set it to auto-terminate when all tasks complete. + * Sets Azure Batch jobs to auto-terminate when all tasks complete. */ @Override void onProcessTerminate(TaskProcessor processor) { @@ -53,25 +50,25 @@ class AzBatchProcessObserver implements TraceObserver { } final executor = processor.executor as AzBatchExecutor - final batchService = executor.getBatchService() + final batchService = executor.batchService + + // Check if auto-termination is enabled if( !batchService?.config?.batch()?.terminateJobsOnCompletion ) { log.trace "Azure Batch job auto-termination is disabled, skipping eager termination for process: ${processor.name}" return } - try { - // Find all job IDs associated with this processor - final jobIds = batchService.allJobIds.findAll { key, jobId -> - key.processor == processor - }.values() - - for( String jobId : jobIds ) { - log.debug "Setting Azure Batch job ${jobId} to auto-terminate for completed process: ${processor.name}" + // Find and set auto-termination for all jobs associated with this processor + batchService.allJobIds.findAll { key, jobId -> + key.processor == processor + }.values().each { jobId -> + log.debug "Setting Azure Batch job ${jobId} to auto-terminate for completed process: ${processor.name}" + try { batchService.setJobAutoTermination(jobId) } - } - catch( Exception e ) { - log.warn "Failed to set auto-termination for Azure Batch jobs associated with process '${processor.name}' - Reason: ${e.message ?: e}" + catch( Exception e ) { + log.warn "Failed to set auto-termination for Azure Batch job ${jobId} associated with process '${processor.name}' - ${e.message ?: e}" + } } } } \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy index 29eace8a34..87b2474aee 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchProcessObserverFactory.groovy @@ -32,14 +32,6 @@ class AzBatchProcessObserverFactory implements TraceObserverFactory { @Override Collection create(Session session) { - final result = new ArrayList() - - // Only create the observer if using Azure Batch executor - final executorName = session.config.navigate('process.executor') - if( executorName == 'azurebatch' ) { - result.add(new AzBatchProcessObserver(session)) - } - - return result + return [new AzBatchProcessObserver(session)] } } \ No newline at end of file diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy index 55df30bebe..cfafd38edc 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy @@ -984,6 +984,15 @@ class AzBatchService implements Closeable { * @param jobId The Azure Batch job ID to set for auto-termination */ void setJobAutoTermination(String jobId) { + setJobTermination(jobId) + } + + /** + * Set a job to terminate when all tasks complete. + * + * @param jobId The Azure Batch job ID to set for auto-termination + */ + protected void setJobTermination(String jobId) { try { log.trace "Setting Azure job ${jobId} to terminate on completion" @@ -1013,25 +1022,7 @@ class AzBatchService implements Closeable { */ protected void terminateJobs() { for( String jobId : allJobIds.values() ) { - try { - log.trace "Setting Azure job ${jobId} to terminate on completion" - - final job = apply(() -> client.getJob(jobId)) - final poolInfo = job.poolInfo - - final jobParameter = new BatchJobUpdateContent() - .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB) - .setPoolInfo(poolInfo) - - apply(() -> client.updateJob(jobId, jobParameter)) - } - catch (HttpResponseException e) { - if (e.response.statusCode == 409) { - log.debug "Azure Batch job ${jobId} already terminated, skipping termination" - } else { - log.warn "Unable to terminate Azure Batch job ${jobId} - Status: ${e.response.statusCode}, Reason: ${e.message ?: e}" - } - } + setJobTermination(jobId) } } diff --git a/plugins/nf-azure/src/resources/META-INF/extensions.idx b/plugins/nf-azure/src/resources/META-INF/extensions.idx index 685c239bca..c9897c5471 100644 --- a/plugins/nf-azure/src/resources/META-INF/extensions.idx +++ b/plugins/nf-azure/src/resources/META-INF/extensions.idx @@ -15,6 +15,7 @@ # nextflow.cloud.azure.batch.AzBatchExecutor +nextflow.cloud.azure.batch.AzBatchProcessObserverFactory nextflow.cloud.azure.file.AzPathFactory nextflow.cloud.azure.file.AzPathSerializer nextflow.cloud.azure.fusion.AzFusionEnv diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy index 307f6e7586..1427651f47 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchProcessObserverTest.groovy @@ -30,112 +30,60 @@ import spock.lang.Specification */ class AzBatchProcessObserverTest extends Specification { - def 'should skip non-azure batch executors'() { + def 'should only act on Azure Batch executors'() { given: - def session = Mock(Session) - def observer = new AzBatchProcessObserver(session) - def executor = Mock(Executor) // Not an AzBatchExecutor + def observer = new AzBatchProcessObserver(Mock(Session)) def processor = Mock(TaskProcessor) { - getExecutor() >> executor + getExecutor() >> Mock(Executor) // Not an AzBatchExecutor } when: observer.onProcessTerminate(processor) then: - // No exception should be thrown and method should return early noExceptionThrown() } - def 'should skip when termination is disabled'() { - given: - def session = Mock(Session) - def observer = new AzBatchProcessObserver(session) - def config = Mock(AzConfig) - def batchOpts = Mock(AzBatchOpts) { - terminateJobsOnCompletion >> false - } - def batchService = Mock(AzBatchService) { - getConfig() >> config - } - def executor = Mock(AzBatchExecutor) { - getBatchService() >> batchService - } - def processor = Mock(TaskProcessor) { - getExecutor() >> executor - getName() >> 'test-process' - } - config.batch() >> batchOpts - - when: - observer.onProcessTerminate(processor) - - then: - 0 * batchService.setJobAutoTermination(_) - } - def 'should set job auto-termination when enabled'() { given: - def session = Mock(Session) - def observer = new AzBatchProcessObserver(session) - def config = Mock(AzConfig) - def batchOpts = Mock(AzBatchOpts) { - terminateJobsOnCompletion >> true - } - def processor = Mock(TaskProcessor) { - getName() >> 'test-process' - } - def jobKey1 = new AzJobKey(Mock(TaskProcessor), 'pool1') - def jobKey2 = new AzJobKey(processor, 'pool1') // This one should match - def jobKey3 = new AzJobKey(Mock(TaskProcessor), 'pool2') - def allJobIds = [ - (jobKey1): 'job1', - (jobKey2): 'job2', // This should be processed - (jobKey3): 'job3' - ] + def observer = new AzBatchProcessObserver(Mock(Session)) + def processor = Mock(TaskProcessor) { getName() >> 'test-process' } def batchService = Mock(AzBatchService) { - getConfig() >> config - getAllJobIds() >> allJobIds - } - def executor = Mock(AzBatchExecutor) { - getBatchService() >> batchService + getConfig() >> Mock(AzConfig) { + batch() >> Mock(AzBatchOpts) { + terminateJobsOnCompletion >> true + } + } + getAllJobIds() >> [(new AzJobKey(processor, 'pool1')): 'job123'] } + def executor = Mock(AzBatchExecutor) { getBatchService() >> batchService } processor.getExecutor() >> executor - config.batch() >> batchOpts when: observer.onProcessTerminate(processor) then: - 1 * batchService.setJobAutoTermination('job2') + 1 * batchService.setJobAutoTermination('job123') } - def 'should handle exceptions gracefully'() { + def 'should skip when termination disabled'() { given: - def session = Mock(Session) - def observer = new AzBatchProcessObserver(session) - def config = Mock(AzConfig) - def batchOpts = Mock(AzBatchOpts) { - terminateJobsOnCompletion >> true - } + def observer = new AzBatchProcessObserver(Mock(Session)) + def processor = Mock(TaskProcessor) { getName() >> 'test-process' } def batchService = Mock(AzBatchService) { - getConfig() >> config - getAllJobIds() >> { throw new RuntimeException('Test error') } + getConfig() >> Mock(AzConfig) { + batch() >> Mock(AzBatchOpts) { + terminateJobsOnCompletion >> false + } + } } - def executor = Mock(AzBatchExecutor) { - getBatchService() >> batchService - } - def processor = Mock(TaskProcessor) { - getExecutor() >> executor - getName() >> 'test-process' - } - config.batch() >> batchOpts + def executor = Mock(AzBatchExecutor) { getBatchService() >> batchService } + processor.getExecutor() >> executor when: observer.onProcessTerminate(processor) then: - // Should not throw exception - noExceptionThrown() + 0 * batchService.setJobAutoTermination(_) } } \ No newline at end of file