Skip to content

Commit

Permalink
most PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
THWiseman committed Oct 31, 2023
1 parent 75ca3ea commit 5b6852c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ object BlobSasTokenGenerator {

}

case class WSMTerraCoordinates(wsmEndpoint: String, workspaceId: UUID, containerResourceId: UUID)

case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]) extends BlobSasTokenGenerator {

Expand Down Expand Up @@ -203,12 +205,36 @@ case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClient
wsmResourceClient.findContainerResourceId(workspaceId, container)
}

def getWsmAuth: Try[String] = {
private def getWsmAuth: Try[String] = {
overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry

Check warning on line 211 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L209-L211

Added lines #L209 - L211 were not covered by tests
}
}
private def parseTerraWorkspaceIdFromPath(blobPath: BlobPath): Try[UUID] = {
if (blobPath.container.value.startsWith("sc-")) Try(UUID.fromString(blobPath.container.value.substring(3)))
else Failure(new Exception("Could not parse workspace ID from storage container. Are you sure this is a file in a Terra Workspace?"))

Check warning on line 216 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L215-L216

Added lines #L215 - L216 were not covered by tests
}

/**
* If the provided blob path looks like it comes from a terra workspace, return an end point that, when called with GET
* and proper authentication, will return a sas token capable of accessing the container the blob path points to.
* @param blobPath A blob path of a file living in a blob container that WSM knows about (likely a workspace container).
*
* NOTE: This function makes two synchronous REST requests.
*/
def getWSMSasFetchEndpoint(blobPath: BlobPath): Try[String] = {
val wsmEndpoint = wsmClientProvider.getBaseWorkspaceManagerUrl

Check warning on line 227 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L227

Added line #L227 was not covered by tests
val terraInfo: Try[WSMTerraCoordinates] = for {
workspaceId <- parseTerraWorkspaceIdFromPath(blobPath)
auth <- getWsmAuth
containerResourceId <- getContainerResourceId(workspaceId, blobPath.container, auth)
coordinates = WSMTerraCoordinates(wsmEndpoint, workspaceId, containerResourceId)
} yield coordinates
terraInfo.map{terraCoordinates =>

Check warning on line 234 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L229-L234

Added lines #L229 - L234 were not covered by tests
s"${terraCoordinates.wsmEndpoint}/api/workspaces/v1/${terraCoordinates.workspaceId.toString}/resources/controlled/azure/storageContainer/${terraCoordinates.containerResourceId.toString}/getSasToken"
}
}
}

case class NativeBlobSasTokenGenerator(subscription: Option[SubscriptionId] = None) extends BlobSasTokenGenerator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.{MalformedURLException, URI}
import java.nio.file.{Files, LinkOption}
import java.util.UUID
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -189,28 +188,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con
*/
def pathWithoutContainer : String = pathString

def parseTerraWorkspaceIdFromPath: Try[UUID] = {
if(container.value.startsWith("sc-")) Try(UUID.fromString(container.value.substring(3))) else Failure(new Exception("Could not parse workspace ID from storage container"))
}

private def getWSMTokenGenerator: Try[WSMBlobSasTokenGenerator] = {
fsm.blobTokenGenerator match {
case wsmGenerator: WSMBlobSasTokenGenerator => Try(wsmGenerator)
case _: Any => Failure(new NoSuchElementException("This blob file does not have an associated WSMBlobSasTokenGenerator"))
}
}
def containerWSMResourceId: Try[UUID] = {
for {
generator <- getWSMTokenGenerator
workspaceId <- parseTerraWorkspaceIdFromPath
wsmAuth <- generator.getWsmAuth
resourceId <- generator.getContainerResourceId(workspaceId, container, wsmAuth)
} yield resourceId
}

def wsmEndpoint: Try[String] = {
getWSMTokenGenerator.map(generator => generator.wsmClientProvider.getBaseWorkspaceManagerUrl)
}
def getFilesystemManager: BlobFileSystemManager = fsm

Check warning on line 191 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala#L191

Added line #L191 was not covered by tests

override def getSymlinkSafePath(options: LinkOption*): Path = toAbsolutePath

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import cromwell.backend.async.{AbortedExecutionHandle, ExecutionHandle, FailedNo
import cromwell.backend.impl.tes.TesAsyncBackendJobExecutionActor.{determineWSMSasEndpointFromInputs, generateLocalizedSasScriptPreamble}
import cromwell.backend.impl.tes.TesResponseJsonFormatter._
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core.logging.JobLogger
import cromwell.core.path.{DefaultPathBuilder, Path}
import cromwell.core.retry.Retry._
import cromwell.core.retry.SimpleExponentialBackoff
import cromwell.filesystems.blob.{BlobPath, BlobPathBuilder}
import cromwell.filesystems.blob.BlobPathBuilder.ValidBlobPath
import cromwell.filesystems.blob.{BlobContainerName, BlobPath, BlobPathBuilder, WSMBlobSasTokenGenerator}
import cromwell.filesystems.drs.{DrsPath, DrsResolver}
import net.ceedubs.ficus.Ficus._
import wom.values.WomFile
Expand Down Expand Up @@ -114,31 +116,35 @@ object TesAsyncBackendJobExecutionActor {
*/
def determineWSMSasEndpointFromInputs(taskInputs: List[Input],
pathGetter: String => Try[Path],
blobConverter: Try[Path] => Try[BlobPath] = maybeConvertToBlob): Option[String] = {
val shouldLocalizeSas = true //TODO: Make this a Workflow Option or come from the WDL
if (!shouldLocalizeSas) return None

logger: JobLogger,
blobConverter: Try[Path] => Try[BlobPath] = maybeConvertToBlob): Try[String] = {
// Collect all of the inputs that are valid blob paths
val blobFiles = taskInputs.collect{
case input if input.url.isDefined => BlobPathBuilder.validateBlobPath(input.url.get)
case Input(_, _, Some(url), _, _, _) => BlobPathBuilder.validateBlobPath(url)
}.collect{
case valid: BlobPathBuilder.ValidBlobPath => valid

Check warning on line 125 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L122-L125

Added lines #L122 - L125 were not covered by tests
}
if(blobFiles.isEmpty) return None

// We use the first blob file in the list as a template for determining the localized sas params
val sasTokenEndpoint = for {
blob <- blobConverter(pathGetter(blobFiles.head.toUrl))
wsmEndpoint <- blob.wsmEndpoint
workspaceId <- blob.parseTerraWorkspaceIdFromPath
containerResourceId <- blob.containerWSMResourceId
endpoint = s"$wsmEndpoint/api/workspaces/v1/$workspaceId/resources/controlled/azure/storageContainer/$containerResourceId/getSasToken"
} yield endpoint
// Log if not all input files live in the same container.
// We'll do our best anyway, but will still only be able to retrieve a token for a single container.
if(blobFiles.forall(_.container == blobFiles.headOption.map(file => file.container).getOrElse(BlobContainerName("no_container")))) {
logger.warn(s"While parsing blob inputs, found more than one container. Can only generate an environment sas token for a single blob container at once.")

Check warning on line 131 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L130-L131

Added lines #L130 - L131 were not covered by tests
}

sasTokenEndpoint match {
case good: Success[String] => Some(good.value)
case _: Any => None
// We use the first blob file in the list as a template for determining the localized sas params
val headBlob: Try[ValidBlobPath] = blobFiles.headOption match {
case Some(validBlob) => Try(validBlob)
case _ => Failure(new NoSuchElementException("No valid blob file for determining WSM end point found in task inputs."))

Check warning on line 137 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L135-L137

Added lines #L135 - L137 were not covered by tests
}

for {
blobFile <- headBlob
blob <- blobConverter(pathGetter(blobFile.toUrl))
endpoint <- blob.getFilesystemManager.blobTokenGenerator match {
case wsmGenerator: WSMBlobSasTokenGenerator => wsmGenerator.getWSMSasFetchEndpoint(blob)
case _ => Failure(new NoSuchElementException("This blob file does not have an associated WSMBlobSasTokenGenerator"))

Check warning on line 145 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L141-L145

Added lines #L141 - L145 were not covered by tests
}
} yield endpoint
}
}

Expand Down Expand Up @@ -171,17 +177,18 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
}

override def scriptPreamble: String = {
val tesTaskPreamble: String = runtimeAttributes.localizedSasEnvVar.map{enviornmentVariableName =>
val workflowName = workflowDescriptor.callable.name
val callInputFiles = jobDescriptor.fullyQualifiedInputs.safeMapValues {
val tesTaskPreamble: String = runtimeAttributes.localizedSasEnvVar match {
case Some(environmentVariableName) =>
val workflowName = workflowDescriptor.callable.name
val callInputFiles = jobDescriptor.fullyQualifiedInputs.safeMapValues {
_.collectAsSeq { case w: WomFile => w }

Check warning on line 184 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L181-L184

Added lines #L181 - L184 were not covered by tests
}
val taskInputs: List[Input] = TesTask.buildTaskInputs(callInputFiles, workflowName, mapCommandLineWomFile)
val preamble = determineWSMSasEndpointFromInputs(taskInputs, getPath).map{ endpoint =>
generateLocalizedSasScriptPreamble(enviornmentVariableName, endpoint)
}.getOrElse("")
preamble
}.getOrElse("")
}
val taskInputs: List[Input] = TesTask.buildTaskInputs(callInputFiles, workflowName, mapCommandLineWomFile)

Check warning on line 186 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L186

Added line #L186 was not covered by tests
determineWSMSasEndpointFromInputs(taskInputs, getPath, jobLogger).map { endpoint =>
generateLocalizedSasScriptPreamble(environmentVariableName, endpoint)
}.getOrElse("")

Check warning on line 189 in supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala#L189

Added line #L189 was not covered by tests
case _ => ""
}
super.scriptPreamble ++ tesTaskPreamble
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object TesRuntimeAttributes {
val DockerWorkingDirKey = "dockerWorkingDir"
val DiskSizeKey = "disk"
val PreemptibleKey = "preemptible"
val LocalizedSasKey = "sasEnvironmentVariable"
val LocalizedSasKey = "azureSasEnvironmentVariable"

private def cpuValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Int Refined Positive] = CpuValidation.optional

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package cromwell.backend.impl.tes

import common.mock.MockSugar
import cromwell.core.logging.JobLogger
import cromwell.core.path
import cromwell.filesystems.blob.{BlobContainerName, BlobPath}
import cromwell.filesystems.blob.{BlobFileSystemManager, BlobPath, WSMBlobSasTokenGenerator}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util.UUID

import scala.util.{Failure, Success, Try}

class TesAsyncBackendJobExecutionActorSpec extends AnyFlatSpec with Matchers with MockSugar {
Expand Down Expand Up @@ -61,16 +62,20 @@ class TesAsyncBackendJobExecutionActorSpec extends AnyFlatSpec with Matchers wit
val mockWorkspaceId = "e58ed763-928c-4155-0000-fdbaaadc15f3"
val mockContainerResourceId = "e58ed763-928c-4155-1111-fdbaaadc15f3"

val mockLogger: JobLogger = mock[JobLogger]
val mockBlobPath: BlobPath = mock[BlobPath]
val mockTokenGenerator: WSMBlobSasTokenGenerator = mock[WSMBlobSasTokenGenerator]
val mockFsm: BlobFileSystemManager = mock[BlobFileSystemManager]

mockTokenGenerator.getWSMSasFetchEndpoint(mockBlobPath) returns Try(s"$mockWsmEndpoint/api/workspaces/v1/$mockWorkspaceId/resources/controlled/azure/storageContainer/$mockContainerResourceId/getSasToken")
mockFsm.blobTokenGenerator returns mockTokenGenerator

val mockNioPath: path.NioPath = mock[path.NioPath]

mockBlobPath.container returns BlobContainerName("1234")
mockBlobPath.wsmEndpoint returns Try(mockWsmEndpoint)
mockBlobPath.parseTerraWorkspaceIdFromPath returns Try(UUID.fromString(mockWorkspaceId))
mockBlobPath.containerWSMResourceId returns Try(UUID.fromString(mockContainerResourceId))
mockBlobPath.nioPath returns mockNioPath
mockBlobPath.md5 returns "BLOB_MD5"
mockBlobPath.getFilesystemManager returns mockFsm
mockBlobPath.toAbsolutePath returns mockBlobPath
mockBlobPath.md5 returns "MOCK_MD5"


val mockPath: cromwell.core.path.Path = mock[cromwell.core.path.Path]
def mockPathGetter(pathString: String): Try[cromwell.core.path.Path] = {
Expand All @@ -82,23 +87,23 @@ class TesAsyncBackendJobExecutionActorSpec extends AnyFlatSpec with Matchers wit

def mockBlobConverter(pathToConvert: Try[cromwell.core.path.Path]): Try[BlobPath] = {
//using a stubbed md5 rather than matching on type because type matching of mocked types at runtime causes problems
if (pathToConvert.get.md5.equals("BLOB_MD5")) pathToConvert.asInstanceOf[Try[BlobPath]] else Failure(new Exception("failed"))
if (pathToConvert.get.md5.equals("MOCK_MD5")) pathToConvert.asInstanceOf[Try[BlobPath]] else Failure(new Exception("failed"))
}

it should "not return sas endpoint when no blob paths are provided" in {
val emptyInputs: List[Input] = List()
val bloblessInputs: List[Input] = List(notBlobInput_1, notBlobInput_2)
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(emptyInputs, mockPathGetter, mockBlobConverter).isEmpty shouldBe true
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(bloblessInputs, mockPathGetter, mockBlobConverter).isEmpty shouldBe true
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(emptyInputs, mockPathGetter, mockLogger, mockBlobConverter).isFailure shouldBe true
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(bloblessInputs, mockPathGetter, mockLogger, mockBlobConverter).isFailure shouldBe true
}

it should "return a sas endpoint based on inputs when blob paths are provided" in {
val expected = s"$mockWsmEndpoint/api/workspaces/v1/$mockWorkspaceId/resources/controlled/azure/storageContainer/$mockContainerResourceId/getSasToken"
val blobInput: List[Input] = List(blobInput_0)
val blobInputs: List[Input] = List(blobInput_0, blobInput_1)
val mixedInputs: List[Input] = List(notBlobInput_1, blobInput_0, blobInput_1)
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(blobInput, mockPathGetter, mockBlobConverter).get shouldEqual expected
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(blobInputs, mockPathGetter, mockBlobConverter).get shouldEqual expected
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(mixedInputs, mockPathGetter, mockBlobConverter).get shouldEqual expected
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(blobInput, mockPathGetter, mockLogger, mockBlobConverter).get shouldEqual expected
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(blobInputs, mockPathGetter, mockLogger, mockBlobConverter).get shouldEqual expected
TesAsyncBackendJobExecutionActor.determineWSMSasEndpointFromInputs(mixedInputs, mockPathGetter, mockLogger, mockBlobConverter).get shouldEqual expected
}
}

0 comments on commit 5b6852c

Please sign in to comment.