Skip to content

Commit

Permalink
Merge branch 'develop' into aen_wx_1633_2
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored May 22, 2024
2 parents d65dbdb + 24c0546 commit b608112
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Users reported cases where Life Sciences jobs failed due to insufficient quota,
quota is available (which is the expected behavior). Cromwell will now retry under these conditions, which present with errors
such as "PAPI error code 9", "no available zones", and/or "quota too low".

### Improved `size()` function performance on arrays

Resolved a hotspot in Cromwell to make the `size()` engine function perform much faster on file arrays. Common examples of file arrays could include globs or scatter-gather results. This enhancement applies only to WDL 1.0 and later, because that's when `size()` added [support for arrays](https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md#acceptable-compound-input-types).


## 87 Release Notes

### GCP Batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import wom.values.{WomString, WomValue}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

object MaterializeWorkflowDescriptorActor {
Expand Down Expand Up @@ -287,10 +288,12 @@ class MaterializeWorkflowDescriptorActor(override val serviceRegistryActor: Acto
}

private def workflowInitializationFailed(errors: NonEmptyList[String], replyTo: ActorRef) =
sender() ! MaterializeWorkflowDescriptorFailureResponse(new IllegalArgumentException with MessageAggregation {
val exceptionContext = "Workflow input processing failed"
val errorMessages = errors.toList
})
sender() ! MaterializeWorkflowDescriptorFailureResponse(
new IllegalArgumentException with MessageAggregation with NoStackTrace {
val exceptionContext = "Workflow input processing failed"
val errorMessages = errors.toList
}
)

private def workflowOptionsAndPathBuilders(
sourceFiles: WorkflowSourceFilesCollection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import wom.CommandSetupSideEffectFile

import scala.concurrent.duration._
import scala.concurrent.Await
import scala.language.postfixOps
import scala.util.Try

object EngineFunctionEvaluators {
Expand Down Expand Up @@ -666,16 +667,28 @@ object EngineFunctionEvaluators {
case _ => false
}

def parallelSize(paths: Seq[String]) =
Try(
Await.result(
ioFunctionSet.parallelSize(paths),
1 hour
)
).toErrorOr

// Inner function: Get the file size, allowing for unpacking of optionals and arrays
def optionalSafeFileSize(value: WomValue): ErrorOr[Long] = value match {
case f if f.isInstanceOf[WomSingleFile] || WomSingleFileType.isCoerceableFrom(f.womType) =>
f.coerceToType[WomSingleFile] flatMap { file =>
Try(Await.result(ioFunctionSet.size(file.valueString), Duration.Inf)).toErrorOr
Try(Await.result(ioFunctionSet.size(file.valueString), ReadWaitTimeout)).toErrorOr
}
case WomOptionalValue(f, Some(o)) if isOptionalOfFileType(f) => optionalSafeFileSize(o)
case WomOptionalValue(f, None) if isOptionalOfFileType(f) => 0L.validNel
case WomArray(WomArrayType(womType), values) if isOptionalOfFileType(womType) =>
values.toList.traverse(optionalSafeFileSize).map(_.sum)
case WomArray(WomArrayType(womType), values) if WomSingleFileType.isCoerceableFrom(womType) =>
// `Array[File]` optimization: parallelize the size calculation
parallelSize(values.map(_.valueString))
case WomArray(WomArrayType(womType), values) if WomOptionalType(WomSingleFileType).isCoerceableFrom(womType) =>
// `Array[File?]` optimization: parallelize the size calculation for defined files
parallelSize(values.flatMap(_.asInstanceOf[WomOptionalValue].value).map(_.valueString))
case _ =>
s"The 'size' method expects a 'File', 'File?', 'Array[File]' or Array[File?] argument but instead got ${value.womType.stableName}.".invalidNel
}
Expand Down
11 changes: 11 additions & 0 deletions wom/src/main/scala/wom/expression/WomExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wom.expression

import cats.data.Validated._
import cats.effect.IO
import cats.implicits.toTraverseOps
import common.validation.ErrorOr.ErrorOr
import wom.expression.IoFunctionSet.IoElement
import wom.types._
Expand Down Expand Up @@ -195,6 +196,16 @@ trait IoFunctionSet {
*/
def size(path: String): Future[Long]

/**
* There was a hot spot around sequentially evaluating the size of a long list of files.
* The engine function evaluators that call into this trait are not async & don't have
* an execution context, so this trick keeps the implementation details hidden. (WX-1633)
*
* @param paths a list of paths
* @return the sum of the sizes of all the files located at the paths
*/
def parallelSize(paths: Seq[String]): Future[Long] = paths.map(size).sequence.map(_.sum)

/**
* To map/flatMap over IO results
*/
Expand Down

0 comments on commit b608112

Please sign in to comment.