Skip to content

Commit

Permalink
Use Papi log for retry with more memory.
Browse files Browse the repository at this point in the history
  • Loading branch information
kshakir committed May 13, 2024
1 parent 0477642 commit 3ccdcb4
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 72 deletions.
2 changes: 2 additions & 0 deletions backend/src/main/scala/cromwell/backend/io/JobPaths.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ trait JobPaths {
lazy val dockerCid = callExecutionRoot.resolve(dockerCidFilename)
lazy val returnCode = callExecutionRoot.resolve(returnCodeFilename)
lazy val memoryRetryRC = callExecutionRoot.resolve(memoryRetryRCFilename)
// Path to to an existing file that contains the error text of the job if it failed due to memory constraints.
lazy val memoryRetryError = standardPaths.error

// This is a `def` because `standardPaths` is a `var` that may be reassigned during the calculation of
// standard output and error file names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ trait StandardAsyncExecutionActor
): Future[ExecutionHandle] = {

// Returns true if the task has written an RC file that indicates OOM, false otherwise
def memoryRetryRC: Future[Boolean] = {
def memoryRetryRC: Future[(Boolean, Option[Path])] = {

def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)
Expand All @@ -1384,22 +1384,33 @@ trait StandardAsyncExecutionActor
}

def checkMemoryRetryStderr(errorKeys: List[String], maxBytes: Int): Future[Boolean] =
readFile(jobPaths.standardPaths.error, Option(maxBytes)) map { errorContent =>
readFile(jobPaths.memoryRetryError, Option(maxBytes)) map { errorContent =>
errorKeys.exists(errorContent.contains)
}

asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
case true => checkMemoryRetryRC()
case false =>
(memoryRetryErrorKeys, memoryRetryStderrLimit) match {
case (Some(keys), Some(limit)) =>
asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
case true => checkMemoryRetryStderr(keys, limit)
case false => Future.successful(false)
}
case _ => Future.successful(false)
}
}
def checkMemoryRetryError(): Future[Boolean] =
(memoryRetryErrorKeys, memoryRetryStderrLimit) match {
case (Some(keys), Some(limit)) =>
for {
memoryRetryErrorExists <- asyncIo.existsAsync(jobPaths.memoryRetryError)
memoryRetryErrorFound <-
if (memoryRetryErrorExists) checkMemoryRetryStderr(keys, limit) else Future.successful(false)
} yield memoryRetryErrorFound
case _ => Future.successful(false)

Check warning on line 1399 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1399

Added line #L1399 was not covered by tests
}

// For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
// the errors from the standard error file, but now sometimes the error is written to a separate log file.
// If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
for {
memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future.successful(false)
memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future.successful(true) else checkMemoryRetryError()
memoryErrorPathOption =
if (memoryRetryRCErrorFound) Option(jobPaths.standardPaths.error)
else if (memoryRetryErrorFound) Option(jobPaths.memoryRetryError)
else None
} yield (memoryRetryErrorFound, memoryErrorPathOption)
}

val stderr = jobPaths.standardPaths.error
Expand All @@ -1410,70 +1421,72 @@ trait StandardAsyncExecutionActor
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
outOfMemoryDetected <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected)

stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isDone(status)) {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
Option(returnCodeAsInt),
None
(outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)

stderrSizeAndReturnCodeAndMemoryRetry flatMap {
case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

if (isDone(status)) {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
// if it was caused by OOM killer, want to handle as OOM and not job abort.
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
retryElseFail(executionHandle)
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
// if it was caused by OOM killer, want to handle as OOM and not job abort.
case Success(returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
Option(returnCodeAsInt),
None
retryElseFail(executionHandle, outOfMemoryDetected)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle)
case Failure(_) =>
Future.successful(
FailedNonRetryableExecutionHandle(
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
kvPairsToSave = None
retryElseFail(executionHandle)
case Failure(_) =>
Future.successful(
FailedNonRetryableExecutionHandle(
ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
kvPairsToSave = None

Check warning on line 1469 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1466-L1469

Added lines #L1466 - L1469 were not covered by tests
)
)
)
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt)
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt)
if outOfMemoryDetected && memoryRetryRequested && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
val executionHandle = Future.successful(
FailedNonRetryableExecutionHandle(
RetryWithMoreMemory(jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
Option(returnCodeAsInt),
None
)
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
retryElseFail(executionHandle, outOfMemoryDetected)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
}
}
}
} recoverWith { case exception =>
if (isDone(status)) Future.successful(FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None))
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ case class PipelinesApiJobPaths(override val workflowPaths: PipelinesApiWorkflow
val jesMonitoringScriptFilename: String = s"${PipelinesApiJobPaths.JesMonitoringKey}.sh"
val jesMonitoringImageScriptFilename: String = s"${PipelinesApiJobPaths.JesMonitoringImageKey}.sh"

override lazy val memoryRetryError: Path = jesLogPath

override lazy val customMetadataPaths = Map(
CallMetadataKeys.BackendLogsPrefix + ":log" -> jesLogPath
) ++ (
Expand Down

0 comments on commit 3ccdcb4

Please sign in to comment.