From be14ac2e851382cadbb597c90731177e6e384fc7 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 17 Dec 2024 14:29:39 +0100 Subject: [PATCH 1/9] Initial publish dir offloading implementation Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Session.groovy | 22 +++ .../nextflow/executor/ExecutorFactory.groovy | 16 +++ .../nextflow/processor/PublishDir.groovy | 26 ++-- .../processor/PublishOffloadManager.groovy | 130 ++++++++++++++++++ .../nextflow/processor/TaskProcessor.groovy | 2 +- .../nextflow/script/ScriptRunner.groovy | 1 + .../PublishOffloadManagerTest.groovy | 38 +++++ 7 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy create mode 100644 modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f394245259..56835a9948 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -16,6 +16,7 @@ package nextflow +import nextflow.processor.PublishOffloadManager import java.nio.file.Files import java.nio.file.Path @@ -293,6 +294,12 @@ class Session implements ISession { FilePorter getFilePorter() { filePorter } + boolean publishOffload + + private PublishOffloadManager publishOffloadManager + + PublishOffloadManager getPublishOffloadManager() {publishOffloadManager} + /** * Creates a new session with an 'empty' (default) configuration */ @@ -394,6 +401,21 @@ class Session implements ISession { // -- file porter config this.filePorter = new FilePorter(this) + this.publishOffload = config.publishOffload as boolean + + if ( this.publishOffload ) { + // -- publish offload manager config + log.warn("Publish offload flag enabled. Creating Offload Manager") + this.publishOffloadManager = new PublishOffloadManager(this) + } + + } + + void startPublishOffloadManager() { + if ( this.publishOffload ) { + log.debug("Starting Publish offload manager") + this.publishOffloadManager?.init() + } } protected Path cloudCachePath(Map cloudcache, Path workDir) { diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy index 57a1535b6c..61ec55087b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy @@ -234,6 +234,22 @@ class ExecutorFactory { return result } + @CompileDynamic + static String getDefaultExecutorName(Session session) { + def result = session.config.process?.executor?.toString() + if( !result ) { + if (session.config.executor instanceof String) { + return session.config.executor + } else if (session.config.executor?.name instanceof String) { + return session.config.executor.name + } else { + return DEFAULT_EXECUTOR + } + } + return result + } + + void signalExecutors() { for( Executor exec : executors.values() ) exec.signal() diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 4ca5764519..c35d51dd9d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -409,9 +409,9 @@ class PublishDir { // create target dirs if required makeDirs(destination.parent) - + def offload = false try { - processFileImpl(source, destination) + offload = processFileImpl(source, destination) } catch( FileAlreadyExistsException e ) { // don't copy source path if target is identical, but still emit the publish event @@ -425,11 +425,12 @@ class PublishDir { if( !sameRealPath && shouldOverwrite(source, destination) ) { FileHelper.deletePath(destination) - processFileImpl(source, destination) + offload = processFileImpl(source, destination) } } - - notifyFilePublish(destination, source) + //Don't notify if file publication is offloaded. It will be notified after the offloaded job is finished. + if (!offload) + notifyFilePublish(destination, source) } private String real0(Path p) { @@ -495,7 +496,7 @@ class PublishDir { return sourceHash != targetHash } - protected void processFileImpl( Path source, Path destination ) { + protected boolean processFileImpl( Path source, Path destination ) { log.trace "publishing file: $source -[$mode]-> $destination" if( !mode || mode == Mode.SYMLINK ) { @@ -509,10 +510,18 @@ class PublishDir { FilesEx.mklink(source, [hard:true], destination) } else if( mode == Mode.MOVE ) { - FileHelper.movePath(source, destination) + if ( session.getPublishOffloadManager().tryMoveOffload(source, destination) ){ + return true + } else { + FileHelper.movePath(source, destination) + } } else if( mode == Mode.COPY ) { - FileHelper.copyPath(source, destination) + if ( session.getPublishOffloadManager().tryCopyOffload(source, destination) ){ + return true + } else { + FileHelper.copyPath(source, destination) + } } else if( mode == Mode.COPY_NO_FOLLOW ) { FileHelper.copyPath(source, destination, LinkOption.NOFOLLOW_LINKS) @@ -520,6 +529,7 @@ class PublishDir { else { throw new IllegalArgumentException("Unknown file publish mode: ${mode}") } + return false } protected void createPublishDir() { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy new file mode 100644 index 0000000000..3cdc58e415 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy @@ -0,0 +1,130 @@ +package nextflow.processor + +import groovy.util.logging.Slf4j +import nextflow.Nextflow +import nextflow.Session +import nextflow.executor.Executor +import nextflow.executor.ExecutorFactory +import nextflow.extension.FilesEx +import nextflow.fusion.FusionHelper +import nextflow.script.BaseScript +import nextflow.script.BodyDef +import nextflow.script.ProcessConfig +import nextflow.script.TokenValCall +import nextflow.util.ArrayTuple + +import java.nio.file.Path + +@Slf4j +class PublishOffloadManager { + Map runningPublications= new HashMap(10) + static final Map SUPPORTED_SCHEMES = [awsbatch:['s3'], local:['file']] + static final String S5CMD_CONTAINER = 'jorgeejarquea/s5cmd_aws:0.0.1' + Session session + PublishTaskProcessor copyProcessor + PublishTaskProcessor moveProcessor + + PublishOffloadManager(Session session) { + this.session = session + } + + void init(){ + + if (useS5cmd()){ + this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"s5cmd cp $source $target"},'copy data process') ) + this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"s5cmd mv $source $target"},'move data process') ) + } else { + this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"cp $source $target"},'copy data process') ) + this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"mv $source $target"},'move data process') ) + } + + } + + private boolean checkOffload(Path source, Path destination, String executor){ + return session.publishOffload && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; + } + + private synchronized boolean tryInvokeProcessor(TaskProcessor processor, Path origin, Path destination){ + if (checkOffload(origin, destination, processor.executor.name)) { + final params = new TaskStartParams(TaskId.next(), processor.indexCount.incrementAndGet()) + final values = new ArrayList(1) + log.debug("Creating task for file publication: ${origin.toUri().toString()} -> ${destination.toUri().toString()} " ) + values[0] = generateFileValues(origin, destination) + final args = new ArrayList(2) + args[0] = params + args[1] = values + assert args.size() == 2 + processor.invokeTask(args.toArray()) + runningPublications.put(processor.currentTask.get(), Nextflow.tuple(origin, destination)) + return true + } + return false + } + + private isFusionEnabled(){ + return FusionHelper.isFusionEnabled(session) + } + + private useS5cmd(){ + return ( (!isFusionEnabled()) && (ExecutorFactory.getDefaultExecutorName(session) == 'awsbatch') ) + } + + private ArrayTuple generateFileValues(Path origin, Path destination){ + if ( isFusionEnabled() ){ + Nextflow.tuple(FusionHelper.toContainerMount(origin), FusionHelper.toContainerMount(destination)) + } else { + Nextflow.tuple(FilesEx.toUriString(origin), FilesEx.toUriString(destination)) + } + } + + boolean tryMoveOffload(Path origin, Path destination) { + tryInvokeProcessor(moveProcessor, origin, destination) + } + + boolean tryCopyOffload(Path origin, Path destination) { + tryInvokeProcessor(copyProcessor, origin, destination) + } + + private PublishTaskProcessor createProcessor( String name, BodyDef body){ + assert body + assert session.script + log.debug("Creating processor $name") + // -- the config object + final processConfig = new ProcessConfig(session.script, name) + // Invoke the code block which will return the script closure to the executed. + // As side effect will set all the property declarations in the 'taskConfig' object. + if (useS5cmd()) { + processConfig.put('container', S5CMD_CONTAINER); + } + processConfig._in_tuple(new TokenValCall('source'), new TokenValCall('target')) + + if ( !body ) + throw new IllegalArgumentException("Missing script in the specified process block -- make sure it terminates with the script string to be executed") + + // -- apply settings from config file to process config + processConfig.applyConfig((Map)session.config.process, name, name, name) + + // -- get the executor for the given process config + final execObj = session.executorFactory.getExecutor(name, processConfig, body, session) + + // -- create processor class + new PublishTaskProcessor( name, execObj, session, session.script, processConfig, body, this ) + } + +} + +class PublishTaskProcessor extends TaskProcessor{ + + PublishOffloadManager manager + + PublishTaskProcessor(String name, Executor executor, Session session, BaseScript baseScript, ProcessConfig processConfig, BodyDef bodyDef, PublishOffloadManager manager) { + super(name, executor, session, baseScript, processConfig, bodyDef) + this.manager = manager + } + + @Override + void finalizeTask0(TaskRun task){ + final tuple = manager.runningPublications.remove(task) + session.notifyFilePublish((Path)tuple.get(0), (Path)tuple.get(1)) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 4b5fbf2791..8c0a8cddc6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -2422,7 +2422,7 @@ class TaskProcessor { * @param task The {@code TaskRun} instance to finalize * @param producedFiles The map of files to be bind the outputs */ - private void finalizeTask0( TaskRun task ) { + protected void finalizeTask0( TaskRun task ) { log.trace "Finalize process > ${safeTaskName(task)}" // -- bind output (files) diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy index 498d50f41f..d9224ea3e0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy @@ -242,6 +242,7 @@ class ScriptRunner { protected run() { log.debug "> Launching execution" assert scriptParser, "Missing script instance to run" + session.startPublishOffloadManager() // -- launch the script execution scriptParser.runScript() // -- normalise output diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy new file mode 100644 index 0000000000..92c93178d3 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy @@ -0,0 +1,38 @@ +package nextflow.processor + +import nextflow.Session +import nextflow.script.BaseScript +import nextflow.script.ScriptBinding +import nextflow.script.ScriptFile +import spock.lang.Specification + +import test.TestHelper + + +class PublishOffloadManagerTest extends Specification { + + def 'should create task processor'(){ + given: + def session = new Session(); + def scriptBinding = new ScriptBinding(session: session) + def script = Stub(BaseScript) + script.getBinding() >> scriptBinding + def folder = TestHelper.createInMemTempDir() + def file = folder.resolve('pipeline.nf'); file.text = 'println "hello"' + def scriptFile = new ScriptFile(file) + session.init(scriptFile) + //session.start() + session.script = script; + def poManager = new PublishOffloadManager(session); + when: + poManager.init() + then: + poManager.copyProcessor != null + poManager.moveProcessor != null + cleanup: + session.classesDir?.deleteDir() + + + } + +} From 1f0dfec401391981e77d685b89eaf04fa091469b Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 17 Dec 2024 15:00:27 +0100 Subject: [PATCH 2/9] fix unit test Signed-off-by: jorgee --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index c35d51dd9d..5bdd88bc0c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -510,14 +510,14 @@ class PublishDir { FilesEx.mklink(source, [hard:true], destination) } else if( mode == Mode.MOVE ) { - if ( session.getPublishOffloadManager().tryMoveOffload(source, destination) ){ + if ( session.getPublishOffloadManager()?.tryMoveOffload(source, destination) ){ return true } else { FileHelper.movePath(source, destination) } } else if( mode == Mode.COPY ) { - if ( session.getPublishOffloadManager().tryCopyOffload(source, destination) ){ + if ( session.getPublishOffloadManager()?.tryCopyOffload(source, destination) ){ return true } else { FileHelper.copyPath(source, destination) From ed27414ca8be5062444ab42ab29c749f37e622ef Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 17 Dec 2024 16:02:10 +0100 Subject: [PATCH 3/9] add integration test Signed-off-by: jorgee --- tests/checks/publish-offload.nf/.checks | 7 +++++++ tests/checks/publish-offload.nf/.config | 1 + tests/publish-offload.nf | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+) create mode 100644 tests/checks/publish-offload.nf/.checks create mode 100644 tests/checks/publish-offload.nf/.config create mode 100644 tests/publish-offload.nf diff --git a/tests/checks/publish-offload.nf/.checks b/tests/checks/publish-offload.nf/.checks new file mode 100644 index 0000000000..e53b74e04b --- /dev/null +++ b/tests/checks/publish-offload.nf/.checks @@ -0,0 +1,7 @@ +set -e +cp .config nextflow.config + +$NXF_RUN > stdout +[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > gen_data'` == 1 ]] || false +[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > publish_dir_copy_process'` == 1 ]] || false +[[ -f publishDir/chunk_1 ]] || false diff --git a/tests/checks/publish-offload.nf/.config b/tests/checks/publish-offload.nf/.config new file mode 100644 index 0000000000..1840185eb4 --- /dev/null +++ b/tests/checks/publish-offload.nf/.config @@ -0,0 +1 @@ +publishOffload = true diff --git a/tests/publish-offload.nf b/tests/publish-offload.nf new file mode 100644 index 0000000000..f9e682392d --- /dev/null +++ b/tests/publish-offload.nf @@ -0,0 +1,18 @@ +params.outdir = 'publishDir' + +process gen_data { + publishDir "${params.outdir}", mode: 'copy' + input: + val(i) + output: + path 'chunk_*' + + script: + """ + dd if=/dev/urandom of=chunk_$i count=1 bs=10M + """ +} + +workflow { + Channel.of(1) | gen_data +} From 5b9fe688e7b38b868390d45f740cd04ce38375f3 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 17 Dec 2024 18:22:47 +0100 Subject: [PATCH 4/9] add header Signed-off-by: jorgee --- .../processor/PublishOffloadManager.groovy | 16 ++++++++++++++++ .../processor/PublishOffloadManagerTest.groovy | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy index 3cdc58e415..9881e441b7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy @@ -1,3 +1,19 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package nextflow.processor import groovy.util.logging.Slf4j diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy index 92c93178d3..cde87e0ff3 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy @@ -1,3 +1,19 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package nextflow.processor import nextflow.Session From 0f394ec5513c42cdfc42e9b4d76a318d778cb61f Mon Sep 17 00:00:00 2001 From: Jorge Ejarque Date: Fri, 20 Dec 2024 11:48:14 +0100 Subject: [PATCH 5/9] Update modules/nextflow/src/main/groovy/nextflow/Session.groovy [skip ci] Co-authored-by: Paolo Di Tommaso Signed-off-by: Jorge Ejarque --- modules/nextflow/src/main/groovy/nextflow/Session.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 56835a9948..f2e93d6133 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -298,7 +298,7 @@ class Session implements ISession { private PublishOffloadManager publishOffloadManager - PublishOffloadManager getPublishOffloadManager() {publishOffloadManager} + PublishOffloadManager getPublishOffloadManager() { publishOffloadManager } /** * Creates a new session with an 'empty' (default) configuration From 832834f6ffdcdef2af201f9f0d6a6f4a263d5fc2 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 20 Dec 2024 20:13:27 +0100 Subject: [PATCH 6/9] group copies and retries Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Session.groovy | 20 ++- .../nextflow/processor/PublishDir.groovy | 4 +- .../processor/PublishOffloadManager.groovy | 159 +++++++++++++----- .../nextflow/processor/copy-group-template.sh | 72 ++++++++ .../PublishOffloadManagerTest.groovy | 3 +- tests/checks/publish-offload.nf/.config | 2 +- 6 files changed, 205 insertions(+), 55 deletions(-) create mode 100644 modules/nextflow/src/main/resources/nextflow/processor/copy-group-template.sh diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f2e93d6133..6c5485f6a0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -294,7 +294,7 @@ class Session implements ISession { FilePorter getFilePorter() { filePorter } - boolean publishOffload + int publishOffloadBatchSize private PublishOffloadManager publishOffloadManager @@ -401,9 +401,9 @@ class Session implements ISession { // -- file porter config this.filePorter = new FilePorter(this) - this.publishOffload = config.publishOffload as boolean + this.publishOffloadBatchSize = config.publishOffloadBatchSize ? config.publishOffloadBatchSize as int : 0 - if ( this.publishOffload ) { + if ( this.publishOffloadBatchSize ) { // -- publish offload manager config log.warn("Publish offload flag enabled. Creating Offload Manager") this.publishOffloadManager = new PublishOffloadManager(this) @@ -412,7 +412,7 @@ class Session implements ISession { } void startPublishOffloadManager() { - if ( this.publishOffload ) { + if ( this.publishOffloadBatchSize ) { log.debug("Starting Publish offload manager") this.publishOffloadManager?.init() } @@ -698,13 +698,19 @@ class Session implements ISession { log.debug "Session await" processesBarrier.awaitCompletion() log.debug "Session await > all processes finished" - terminated = true - monitorsBarrier.awaitCompletion() - log.debug "Session await > all barriers passed" + + if( !aborted ) { joinAllOperators() log.trace "Session > all operators finished" } + // shutdown t + publishPoolManager?.shutdown(false) + publishOffloadManager?.close() + log.debug "Session await > Publish phase finished" + terminated = true + monitorsBarrier.awaitCompletion() + log.debug "Session await > all barriers passed" } void destroy() { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 5bdd88bc0c..24284a7806 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -510,14 +510,14 @@ class PublishDir { FilesEx.mklink(source, [hard:true], destination) } else if( mode == Mode.MOVE ) { - if ( session.getPublishOffloadManager()?.tryMoveOffload(source, destination) ){ + if ( session.getPublishOffloadManager()?.tryMoveOffload(source, destination, retryConfig, failOnError) ){ return true } else { FileHelper.movePath(source, destination) } } else if( mode == Mode.COPY ) { - if ( session.getPublishOffloadManager()?.tryCopyOffload(source, destination) ){ + if ( session.getPublishOffloadManager()?.tryCopyOffload(source, destination, retryConfig, failOnError) ){ return true } else { FileHelper.copyPath(source, destination) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy index 9881e441b7..01aed46d6c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy @@ -16,6 +16,7 @@ package nextflow.processor +import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.Nextflow import nextflow.Session @@ -26,52 +27,65 @@ import nextflow.fusion.FusionHelper import nextflow.script.BaseScript import nextflow.script.BodyDef import nextflow.script.ProcessConfig -import nextflow.script.TokenValCall import nextflow.util.ArrayTuple import java.nio.file.Path +import java.util.concurrent.atomic.AtomicInteger @Slf4j class PublishOffloadManager { - Map runningPublications= new HashMap(10) + Map runningPublications= new HashMap(10) static final Map SUPPORTED_SCHEMES = [awsbatch:['s3'], local:['file']] static final String S5CMD_CONTAINER = 'jorgeejarquea/s5cmd_aws:0.0.1' - Session session - PublishTaskProcessor copyProcessor - PublishTaskProcessor moveProcessor + static final String PUBLISH_FUNCTION = 'nxf_publish' + private Session session + private PublishTaskProcessor publishProcessor + private List commands = new LinkedList(); + private boolean closed = false; + /** + * Unique offloaded index number + */ + final protected AtomicInteger indexCount = new AtomicInteger() PublishOffloadManager(Session session) { - this.session = session + this.session = session; } + @PackageScope + TaskProcessor getPublishProcessor(){ publishProcessor } void init(){ - - if (useS5cmd()){ - this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"s5cmd cp $source $target"},'copy data process') ) - this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"s5cmd mv $source $target"},'move data process') ) - } else { - this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"cp $source $target"},'copy data process') ) - this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"mv $source $target"},'move data process') ) - } + //Try with template + BodyDef body = new BodyDef({ + def file = PublishOffloadManager.class.getResource('copy-group-template.sh') + return file.text + },'publish file process', 'shell') + this.publishProcessor = createProcessor( "publish_process", body ) } private boolean checkOffload(Path source, Path destination, String executor){ - return session.publishOffload && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; - } - - private synchronized boolean tryInvokeProcessor(TaskProcessor processor, Path origin, Path destination){ - if (checkOffload(origin, destination, processor.executor.name)) { - final params = new TaskStartParams(TaskId.next(), processor.indexCount.incrementAndGet()) - final values = new ArrayList(1) - log.debug("Creating task for file publication: ${origin.toUri().toString()} -> ${destination.toUri().toString()} " ) - values[0] = generateFileValues(origin, destination) - final args = new ArrayList(2) - args[0] = params - args[1] = values - assert args.size() == 2 - processor.invokeTask(args.toArray()) - runningPublications.put(processor.currentTask.get(), Nextflow.tuple(origin, destination)) + return session.publishOffloadBatchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; + } + + private void invokeProcessor(inputValue) { + final params = new TaskStartParams(TaskId.next(), publishProcessor.indexCount.incrementAndGet()) + final values = new ArrayList(1) + values[0] = inputValue + final args = new ArrayList(2) + args[0] = params + args[1] = values + publishProcessor.invokeTask(args.toArray()) + } + + private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError){ + if (checkOffload(origin, destination, publishProcessor.executor.name)) { + final id = indexCount.incrementAndGet() + runningPublications.put(id, Nextflow.tuple(origin, destination, failonError)) + commands.add(generateExecutionCommand(id, command, origin, destination, retryConfig)) + if (commands.size() == session.publishOffloadBatchSize){ + invokeProcessor(commands.join(";")) + commands.clear() + } return true } return false @@ -85,20 +99,33 @@ class PublishOffloadManager { return ( (!isFusionEnabled()) && (ExecutorFactory.getDefaultExecutorName(session) == 'awsbatch') ) } - private ArrayTuple generateFileValues(Path origin, Path destination){ - if ( isFusionEnabled() ){ - Nextflow.tuple(FusionHelper.toContainerMount(origin), FusionHelper.toContainerMount(destination)) + private String generateExecutionCommand(Integer id, String command, Path origin, Path destination, PublishRetryConfig retryConfig){ + return "$PUBLISH_FUNCTION ${retryConfig.maxAttempts} ${retryConfig.delay.toMillis()} ${retryConfig.jitter} ${retryConfig.maxDelay.toMillis()} " + + "$id $command ${convertFilePath(origin)} ${convertFilePath(destination)}" + } + + private String convertFilePath(Path path) { + if (isFusionEnabled()) { + return FusionHelper.toContainerMount(path) } else { - Nextflow.tuple(FilesEx.toUriString(origin), FilesEx.toUriString(destination)) + return FilesEx.toUriString(path) } } - boolean tryMoveOffload(Path origin, Path destination) { - tryInvokeProcessor(moveProcessor, origin, destination) + boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + String command = 'mv' + if ( useS5cmd() ) { + command = 's5cmd mv' + } + tryOffload(command, origin, destination, retryConfig, failonError) } - boolean tryCopyOffload(Path origin, Path destination) { - tryInvokeProcessor(copyProcessor, origin, destination) + boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + String command = 'cp' + if ( useS5cmd() ) { + command = 's5cmd cp' + } + tryOffload(command, origin, destination, retryConfig, failonError) } private PublishTaskProcessor createProcessor( String name, BodyDef body){ @@ -112,8 +139,8 @@ class PublishOffloadManager { if (useS5cmd()) { processConfig.put('container', S5CMD_CONTAINER); } - processConfig._in_tuple(new TokenValCall('source'), new TokenValCall('target')) - + processConfig._in_val('executions') + processConfig._out_stdout() if ( !body ) throw new IllegalArgumentException("Missing script in the specified process block -- make sure it terminates with the script string to be executed") @@ -127,10 +154,18 @@ class PublishOffloadManager { new PublishTaskProcessor( name, execObj, session, session.script, processConfig, body, this ) } + synchronized void close() { + closed=true + if ( commands.size() ){ + invokeProcessor(commands.join(";")) + commands.clear() + } + + } } +@Slf4j class PublishTaskProcessor extends TaskProcessor{ - PublishOffloadManager manager PublishTaskProcessor(String name, Executor executor, Session session, BaseScript baseScript, ProcessConfig processConfig, BodyDef bodyDef, PublishOffloadManager manager) { @@ -139,8 +174,46 @@ class PublishTaskProcessor extends TaskProcessor{ } @Override - void finalizeTask0(TaskRun task){ - final tuple = manager.runningPublications.remove(task) - session.notifyFilePublish((Path)tuple.get(0), (Path)tuple.get(1)) + void finalizeTask0(TaskRun task) { + if( task.outputs.size() == 1 ){ + def value = task.outputs.values().first() + value = value instanceof Path ? value.text : value?.toString() + for ( String finishedCopy : value.split('\n') ){ + final result = finishedCopy.split(":") + if (result.size() == 2) { + final id = result[0] as Integer + final tuple = manager.runningPublications.remove(id) + final exitcode = result[1] as Integer + if( exitcode == 0 ){ + session.notifyFilePublish((Path) tuple.get(0), (Path) tuple.get(1)) + } else { + if (tuple.get(2) as Boolean) { + log.error("Publication of file ${tuple.get(0)} -> ${tuple.get(1)} failed.") + } else { + log.warn("Publication of file ${tuple.get(0)} -> ${tuple.get(1)} failed.") + } + printPublishTaskError(task) + } + } + } + } else { + log.error("Incorrect number of outputs in the publish task"); + } + } + + private void printPublishTaskError(TaskRun task){ + final List message = [] + final max = 50 + final lines = task.dumpStderr(max) + message << "Executed publish task:" + if( lines ) { + message << "\nCommand error:" + for( String it : lines ) { + message << " ${stripWorkDir(it, task.workDir)}" + } + } + if( task?.workDir ) + message << "\nWork dir:\n ${task.workDirStr}" + log.debug(message.join('\n')) } } diff --git a/modules/nextflow/src/main/resources/nextflow/processor/copy-group-template.sh b/modules/nextflow/src/main/resources/nextflow/processor/copy-group-template.sh new file mode 100644 index 0000000000..e9ba4d45cb --- /dev/null +++ b/modules/nextflow/src/main/resources/nextflow/processor/copy-group-template.sh @@ -0,0 +1,72 @@ +## +## Copyright 2013-2024, Seqera Labs +## +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## +set +e +export LC_NUMERIC=C + +nxf_publish() { + local max_retries=$1 + local delay_ms=$2 + local jitter_factor=$3 + local max_delay_ms=$4 + local id=$5 + shift 5 + local attempt=1 + + while true; do + "$@" && echo $id:0 && return # Run the command; if it succeeds, exit the function + + if (( attempt >= max_retries )); then + echo "Command failed after $attempt attempts." >&2 + echo $id:1 + return + fi + + # Calculate jitter as a factor of the delay + local min_factor=$(bc <<< "1 - $jitter_factor") + local max_factor=$(bc <<< "1 + $jitter_factor") + local random_factor=$(awk -v min="$min_factor" -v max="$max_factor" 'BEGIN{srand(); print min + (max-min)*rand()}') + + # Apply the jitter factor to the delay + local jittered_delay_ms=$(bc <<< "$delay_ms * $random_factor") + + # Ensure the delay does not exceed the maximum delay + if (( $(bc <<< "$jittered_delay_ms > $max_delay_ms") == 1 )); then + jittered_delay_ms=$max_delay_ms + fi + + # Convert milliseconds to seconds for sleep + local total_delay_sec=$(bc <<< "scale=3; $jittered_delay_ms / 1000") + + echo "Command failed. Retrying in $total_delay_sec seconds..." >&2 + sleep "$total_delay_sec" + + # Increment the delay with exponential backoff, clamped to max_delay_ms + delay_ms=$((delay_ms * 2)) + if (( delay_ms > max_delay_ms )); then + delay_ms=$max_delay_ms + fi + + ((attempt++)) + done +} + +IFS=$';' +publications=('!{executions}') +for publication in "${publications[@]}"; do + eval $publication & +done +unset IFS +wait \ No newline at end of file diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy index cde87e0ff3..2083df8389 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy @@ -43,8 +43,7 @@ class PublishOffloadManagerTest extends Specification { when: poManager.init() then: - poManager.copyProcessor != null - poManager.moveProcessor != null + poManager.publishProcessor != null cleanup: session.classesDir?.deleteDir() diff --git a/tests/checks/publish-offload.nf/.config b/tests/checks/publish-offload.nf/.config index 1840185eb4..44d46055e9 100644 --- a/tests/checks/publish-offload.nf/.config +++ b/tests/checks/publish-offload.nf/.config @@ -1 +1 @@ -publishOffload = true +publishOffloadBatchSize = 2 From ac04d04d615dc4518c69b39bdd1dc9fd5eaa9cc2 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 20 Dec 2024 20:24:02 +0100 Subject: [PATCH 7/9] fix minor issue Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Session.groovy | 4 +-- .../processor/PublishOffloadManager.groovy | 28 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 6c5485f6a0..908faf9591 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -294,7 +294,7 @@ class Session implements ISession { FilePorter getFilePorter() { filePorter } - int publishOffloadBatchSize + private int publishOffloadBatchSize private PublishOffloadManager publishOffloadManager @@ -406,7 +406,7 @@ class Session implements ISession { if ( this.publishOffloadBatchSize ) { // -- publish offload manager config log.warn("Publish offload flag enabled. Creating Offload Manager") - this.publishOffloadManager = new PublishOffloadManager(this) + this.publishOffloadManager = new PublishOffloadManager(this, publishOffloadBatchSize) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy index 01aed46d6c..ff9cdf5e25 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishOffloadManager.groovy @@ -40,15 +40,17 @@ class PublishOffloadManager { static final String PUBLISH_FUNCTION = 'nxf_publish' private Session session private PublishTaskProcessor publishProcessor - private List commands = new LinkedList(); - private boolean closed = false; + private List commands = new LinkedList() + private boolean closed = false + private int batchSize /** * Unique offloaded index number */ final protected AtomicInteger indexCount = new AtomicInteger() - PublishOffloadManager(Session session) { + PublishOffloadManager(Session session, int batchSize) { this.session = session; + this.batchSize = batchSize; } @PackageScope TaskProcessor getPublishProcessor(){ publishProcessor } @@ -64,7 +66,7 @@ class PublishOffloadManager { } private boolean checkOffload(Path source, Path destination, String executor){ - return session.publishOffloadBatchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; + return this.batchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor]; } private void invokeProcessor(inputValue) { @@ -77,12 +79,12 @@ class PublishOffloadManager { publishProcessor.invokeTask(args.toArray()) } - private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError){ + private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError){ if (checkOffload(origin, destination, publishProcessor.executor.name)) { final id = indexCount.incrementAndGet() - runningPublications.put(id, Nextflow.tuple(origin, destination, failonError)) + runningPublications.put(id, Nextflow.tuple(origin, destination, failOnError)) commands.add(generateExecutionCommand(id, command, origin, destination, retryConfig)) - if (commands.size() == session.publishOffloadBatchSize){ + if (commands.size() == this.batchSize){ invokeProcessor(commands.join(";")) commands.clear() } @@ -112,20 +114,20 @@ class PublishOffloadManager { } } - boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) { String command = 'mv' if ( useS5cmd() ) { command = 's5cmd mv' } - tryOffload(command, origin, destination, retryConfig, failonError) + tryOffload(command, origin, destination, retryConfig, failOnError) } - boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) { + boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) { String command = 'cp' if ( useS5cmd() ) { command = 's5cmd cp' } - tryOffload(command, origin, destination, retryConfig, failonError) + tryOffload(command, origin, destination, retryConfig, failOnError) } private PublishTaskProcessor createProcessor( String name, BodyDef body){ @@ -183,8 +185,8 @@ class PublishTaskProcessor extends TaskProcessor{ if (result.size() == 2) { final id = result[0] as Integer final tuple = manager.runningPublications.remove(id) - final exitcode = result[1] as Integer - if( exitcode == 0 ){ + final exitCode = result[1] as Integer + if( exitCode == 0 ){ session.notifyFilePublish((Path) tuple.get(0), (Path) tuple.get(1)) } else { if (tuple.get(2) as Boolean) { From 9ac038fd65981878d300fabf71e21c9def029c94 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 20 Dec 2024 20:42:38 +0100 Subject: [PATCH 8/9] fix unit test Signed-off-by: jorgee --- .../groovy/nextflow/processor/PublishOffloadManagerTest.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy index 2083df8389..7af426409d 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishOffloadManagerTest.groovy @@ -39,7 +39,7 @@ class PublishOffloadManagerTest extends Specification { session.init(scriptFile) //session.start() session.script = script; - def poManager = new PublishOffloadManager(session); + def poManager = new PublishOffloadManager(session, 2); when: poManager.init() then: From 2c5c185481ae669c6f9615f0f219993bcba9506e Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 20 Dec 2024 22:36:30 +0100 Subject: [PATCH 9/9] fix integration test Signed-off-by: jorgee --- tests/checks/publish-offload.nf/.checks | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/checks/publish-offload.nf/.checks b/tests/checks/publish-offload.nf/.checks index e53b74e04b..c40a2ea993 100644 --- a/tests/checks/publish-offload.nf/.checks +++ b/tests/checks/publish-offload.nf/.checks @@ -3,5 +3,5 @@ cp .config nextflow.config $NXF_RUN > stdout [[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > gen_data'` == 1 ]] || false -[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > publish_dir_copy_process'` == 1 ]] || false +[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > publish_process'` == 1 ]] || false [[ -f publishDir/chunk_1 ]] || false