Skip to content

Commit

Permalink
ID-1276 Introduce Bard Service for sending metrics (#7434)
Browse files Browse the repository at this point in the history
Co-authored-by: Tristan Garwood <[email protected]>
Co-authored-by: Adam Nichols <[email protected]>
Co-authored-by: Janet Gainer-Dewar <[email protected]>
  • Loading branch information
4 people authored Jun 17, 2024
1 parent ea67a13 commit 748f488
Show file tree
Hide file tree
Showing 30 changed files with 720 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import wom.graph.CommandCallNode

import scala.concurrent.ExecutionContext

trait BackendLifecycleActorFactory {
trait BackendLifecycleActorFactory extends PlatformSpecific {

/**
* Name of the backend.
Expand Down Expand Up @@ -166,7 +166,7 @@ trait BackendLifecycleActorFactory {
/**
* Allows Cromwell to self-identify which cloud it's running on for runtime attribute purposes
*/
def platform: Option[Platform] = None
override def platform: Option[Platform] = None
}

object BackendLifecycleActorFactory {
Expand Down
15 changes: 15 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,18 @@ object Azure extends Platform {
object Aws extends Platform {
override def runtimeKey: String = "aws"
}
trait PlatformSpecific {
def platform: Option[Platform]

}
trait GcpPlatform extends PlatformSpecific {
override val platform: Option[Platform] = Option(Gcp)
}

trait AzurePlatform extends PlatformSpecific {
override val platform: Option[Platform] = Option(Azure)
}

trait AwsPlatform extends PlatformSpecific {
override val platform: Option[Platform] = Option(Aws)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class DummyAsyncExecutionActor(override val standardParams: StandardAsyncExecuti
with StandardAsyncExecutionActor
with CromwellInstrumentation {

override def platform: Option[Nothing] = None

/** The type of the run info when a job is started. */
override type StandardAsyncRunInfo = String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@ import cromwell.core.path.Path
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import cromwell.services.metrics.bard.BardEventing.BardEventRequest
import cromwell.services.metrics.bard.model.TaskSummaryEvent
import eu.timepit.refined.refineV
import mouse.all._
import net.ceedubs.ficus.Ficus._
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import shapeless.Coproduct
import wdl4s.parser.MemoryUnit
import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression}
import wom.expression.WomExpression
import wom.graph.LocalName
import wom.values._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}

import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.io.IOException
import scala.concurrent._
import scala.concurrent.duration._
Expand Down Expand Up @@ -70,6 +75,8 @@ case class DefaultStandardAsyncExecutionActorParams(
// Override to `false` when we need the script to set an environment variable in the parent shell.
case class ScriptPreambleData(bashString: String, executeInSubshell: Boolean = true)

case class StartAndEndTimes(jobStart: OffsetDateTime, cpuStart: Option[OffsetDateTime], jobEnd: OffsetDateTime)

/**
* An extension of the generic AsyncBackendJobExecutionActor providing a standard abstract implementation of an
* asynchronous polling backend.
Expand All @@ -85,7 +92,8 @@ trait StandardAsyncExecutionActor
with StandardCachingActorHelper
with AsyncIoActorClient
with KvClient
with SlowJobWarning {
with SlowJobWarning
with PlatformSpecific {
this: Actor with ActorLogging with BackendJobLifecycleActor =>

override lazy val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder
Expand Down Expand Up @@ -894,6 +902,14 @@ trait StandardAsyncExecutionActor
*/
def getTerminalEvents(runStatus: StandardAsyncRunState): Seq[ExecutionEvent] = Seq.empty

/**
* Get the min and max event times from a terminal run status
*
* @param runStatus The terminal run status, as defined by isTerminal.
* @return The min and max event times, if events exist.
*/
def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = None

/**
* Returns true if the status represents a completion.
*
Expand Down Expand Up @@ -1316,6 +1332,7 @@ trait StandardAsyncExecutionActor
val metadata = getTerminalMetadata(state)
onTaskComplete(state, oldHandle)
tellMetadata(metadata)
tellBard(state)
handleExecutionResult(state, oldHandle)
case s =>
Future.successful(
Expand Down Expand Up @@ -1509,6 +1526,40 @@ trait StandardAsyncExecutionActor
serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), metadataKeyValues)
}

def tellBard(state: StandardAsyncRunState): Unit =
getStartAndEndTimes(state) match {
case Some(startAndEndTimes: StartAndEndTimes) =>
val dockerImage =
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes)
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value
val memory = RuntimeAttributesValidation
.extract(MemoryValidation.instance(), validatedRuntimeAttributes)
.to(MemoryUnit.Bytes)
.amount
serviceRegistryActor ! BardEventRequest(
TaskSummaryEvent(
workflowDescriptor.id.id,
workflowDescriptor.possibleParentWorkflowId.map(_.id),
workflowDescriptor.rootWorkflowId.id,
jobDescriptor.key.tag,
jobDescriptor.key.call.fullyQualifiedName,
jobDescriptor.key.index,
jobDescriptor.key.attempt,
state.getClass.getSimpleName,
platform.map(_.runtimeKey),
dockerImage,
cpus,
memory,
startAndEndTimes.jobStart.toString,
startAndEndTimes.cpuStart.map(_.toString),
startAndEndTimes.jobEnd.toString,
startAndEndTimes.jobStart.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS),
startAndEndTimes.cpuStart.map(_.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS))
)
)
case _ => ()
}

implicit override protected lazy val ec: ExecutionContextExecutor = context.dispatcher
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ lazy val languageFactoryCore = (project in languageFactoryRoot / "language-facto
.dependsOn(common % "test->test")

lazy val wdlDraft2LanguageFactory = (project in languageFactoryRoot / "wdl-draft2")
.withLibrarySettings("wdl-draft2", draft2LanguageFactoryDependencies)
.withLibrarySettings("wdl-draft2", mockServerDependencies)
.dependsOn(languageFactoryCore)
.dependsOn(common % "test->test")
.dependsOn(wdlModelDraft2)
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ akka {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}

bard-actor-mailbox {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}

dispatchers {
# A dispatcher for actors performing blocking io operations
# Prevents the whole system from being slowed down when waiting for responses from external resources for instance
Expand Down Expand Up @@ -582,6 +586,17 @@ services {
# ecm.base-url = ""
}
}
// Bard is used for metrics collection in the Terra SaaS offering and is not applicable outside of it.
Bard {
class = "cromwell.services.metrics.bard.impl.BardEventingActor"
config {
enabled = false
bard {
base-url = ""
connection-pool-size = 0
}
}
}
}

include required(classpath("reference_database.inc.conf"))
Expand Down
17 changes: 12 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Dependencies {
private val akkaV = "2.5.32" // scala-steward:off (CROM-6637)
private val ammoniteOpsV = "2.4.1"
private val apacheHttpClientV = "4.5.13"
private val apacheHttpClient5V = "5.3.1"
private val awsSdkV = "2.17.265"
// We would like to use the BOM to manage Azure SDK versions, but SBT doesn't support it.
// https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/boms/azure-sdk-bom
Expand All @@ -21,6 +22,7 @@ object Dependencies {
private val azureAppInsightsLogbackV = "2.6.4"
private val betterFilesV = "3.9.1"
private val jsonSmartV = "2.4.10"
private val bardClientV = "1.0.4"
/*
cats-effect, fs2, http4s, and sttp (also to v3) should all be upgraded at the same time to use cats-effect 3.x.
*/
Expand Down Expand Up @@ -421,8 +423,8 @@ object Dependencies {
exclude("org.apache.httpcomponents", "httpclient"),
"org.broadinstitute.dsde.workbench" %% "workbench-google" % workbenchGoogleV
exclude("com.google.apis", "google-api-services-genomics"),
"org.apache.httpcomponents" % "httpclient" % apacheHttpClientV,
"com.google.apis" % "google-api-services-cloudkms" % googleCloudKmsV
"org.apache.httpcomponents.client5" % "httpclient5" % apacheHttpClient5V,
"com.google.apis" % "google-api-services-cloudkms" % googleCloudKmsV
exclude("com.google.guava", "guava-jdk5"),
"org.glassfish.hk2.external" % "jakarta.inject" % jakartaInjectV,
) ++ googleGenomicsV2Alpha1Dependency ++ googleLifeSciencesV2BetaDependency ++ googleBatchv1Dependency
Expand Down Expand Up @@ -529,7 +531,7 @@ object Dependencies {
"jakarta.activation" % "jakarta.activation-api" % jakartaActivationV,
)

val draft2LanguageFactoryDependencies = List(
val mockServerDependencies = List(
"org.mock-server" % "mockserver-netty" % mockserverNettyV % Test
)

Expand Down Expand Up @@ -593,7 +595,12 @@ object Dependencies {
val servicesDependencies: List[ModuleID] = List(
"com.google.api" % "gax-grpc" % googleGaxGrpcV,
"org.apache.commons" % "commons-csv" % commonsCsvV,
) ++ testDatabaseDependencies ++ akkaHttpDependencies
"bio.terra" % "bard-client-resttemplate-javax" % bardClientV
exclude("org.springframework", "spring-aop")
exclude("org.springframework", "spring-jcl"),
"org.apache.httpcomponents.client5" % "httpclient5" % apacheHttpClient5V // Needed for rest-template connection pooling

) ++ testDatabaseDependencies ++ akkaHttpDependencies ++ mockServerDependencies

val serverDependencies: List[ModuleID] = slf4jBindingDependencies

Expand Down Expand Up @@ -664,7 +671,7 @@ object Dependencies {
cromwellApiClientDependencies ++
databaseMigrationDependencies ++
databaseSqlDependencies ++
draft2LanguageFactoryDependencies ++
mockServerDependencies ++
drsLocalizerDependencies ++
engineDependencies ++
gcsFileSystemDependencies ++
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cromwell.services.metrics.bard

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._

final case class BardConfig(enabled: Boolean, baseUrl: String, connectionPoolSize: Int)

object BardConfig {
def apply(config: Config): BardConfig = BardConfig(config.as[Boolean]("enabled"),
config.as[String]("bard.base-url"),
config.as[Int]("bard.connection-pool-size")
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cromwell.services.metrics.bard

import cromwell.services.ServiceRegistryActor.ServiceRegistryMessage
import cromwell.services.metrics.bard.model.BardEvent

object BardEventing {
sealed trait BardEventingMessage extends ServiceRegistryMessage {
override def serviceName: String = "BardEventing"
}

case class BardEventRequest(event: BardEvent) extends BardEventingMessage

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package cromwell.services.metrics.bard

import akka.actor.ActorRef
import bio.terra.bard.api.DefaultApi
import bio.terra.bard.client.ApiClient
import bio.terra.bard.model.EventsEventLogRequest
import cats.data.NonEmptyList
import com.typesafe.scalalogging.LazyLogging
import cromwell.services.instrumentation.CromwellInstrumentation
import cromwell.services.metrics.bard.model.BardEvent
import org.apache.http.impl.client.HttpClients
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory
import org.springframework.web.client.RestTemplate

class BardService(bardUrl: String, connectionPoolSize: Int, serviceRegistry: ActorRef)
extends LazyLogging
with CromwellInstrumentation {

private val restTemplate = makeRestTemplateWithPooling
private val client = getEventApi(restTemplate)
private val appId = "cromwell"

override lazy val serviceRegistryActor: ActorRef = serviceRegistry

def sendEvent(event: BardEvent): Unit = {
try {
val eventLogRequest = new EventsEventLogRequest().properties(event.getProperties)
client.eventsEventLog(event.eventName, appId, eventLogRequest)
increment(NonEmptyList.of("send_event", "success"), Some("bard"))
} catch {
case e: Exception =>
logger.error(s"Failed to send event to Bard: ${e.getMessage}", e)
increment(NonEmptyList.of("send_event", "failure"), Some("bard"))
}
()
}

private def getEventApi(restTemplate: RestTemplate): DefaultApi = {
val bardClient = new ApiClient(restTemplate)
bardClient.setBasePath(bardUrl)
new DefaultApi(bardClient)
}

/**
* @return a new RestTemplate backed by a pooling connection manager
*/
private def makeRestTemplateWithPooling: RestTemplate = {
val poolingConnManager = new PoolingHttpClientConnectionManager()
poolingConnManager.setMaxTotal(connectionPoolSize)
poolingConnManager.setDefaultMaxPerRoute(connectionPoolSize)
val httpClient = HttpClients.custom.setConnectionManager(poolingConnManager).build
val factory = new HttpComponentsClientHttpRequestFactory(httpClient)
new RestTemplate(factory)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package cromwell.services.metrics.bard.impl

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.util.StringUtil.EnhancedToStringable
import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.services.metrics.bard.BardEventing.BardEventRequest
import cromwell.services.metrics.bard.{BardConfig, BardService}
import cromwell.util.GracefulShutdownHelper.ShutdownCommand

import scala.concurrent.ExecutionContext

class BardEventingActor(serviceConfig: Config, globalConfig: Config, serviceRegistry: ActorRef)
extends Actor
with LazyLogging {

implicit val system: ActorSystem = context.system
implicit val ec: ExecutionContext = context.dispatcher

lazy val bardConfig: BardConfig = BardConfig(serviceConfig)
lazy val bardService: BardService =
new BardService(bardConfig.baseUrl, bardConfig.connectionPoolSize, serviceRegistry)

override def receive: Receive = {
case BardEventRequest(event) if bardConfig.enabled => bardService.sendEvent(event)
// This service currently doesn't do any work on shutdown but the service registry pattern requires it
// (see https://github.com/broadinstitute/cromwell/issues/2575)
case ShutdownCommand => context stop self
case other =>
logger.error(
s"Programmer Error: Unexpected message ${other.toPrettyElidedString(1000)} received by ${this.self.path.name}."
)
}
}

object BardEventingActor {

def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef): Props =
Props(new BardEventingActor(serviceConfig, globalConfig, serviceRegistryActor))
.withDispatcher(ServiceDispatcher)
.withMailbox("akka.bard-actor-mailbox")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package cromwell.services.metrics.bard.model

trait BardEvent extends Product {

private val baseProperties: Map[String, Any] = Map("event" -> eventName, "pushToMixpanel" -> false)

def eventName: String

def assembleScalaProperties: Map[String, Any] =
this.productElementNames.zip(this.productIterator).toMap ++ baseProperties

def getProperties: java.util.Map[String, Any]

}
Loading

0 comments on commit 748f488

Please sign in to comment.