Skip to content

Commit

Permalink
[WX-1675] Record cloud quota delay to GroupMetrics table (#7501)
Browse files Browse the repository at this point in the history
  • Loading branch information
salonishah11 authored Aug 23, 2024
1 parent 6b689a8 commit 459a6ef
Show file tree
Hide file tree
Showing 35 changed files with 405 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ trait BackendLifecycleActorFactory extends PlatformSpecific {
initializationData: Option[BackendInitializationData],
serviceRegistryActor: ActorRef,
ioActor: ActorRef,
backendSingletonActor: Option[ActorRef]
backendSingletonActor: Option[ActorRef],
groupMetricsActor: ActorRef
): Props

lazy val jobExecutionTokenType: JobTokenType = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cromwell.backend.standard

import akka.actor.{Actor, ActorLogging, Props}
import akka.dispatch.MessageDispatcher
import common.util.StringUtil.EnhancedToStringable
import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion
import cromwell.core.Dispatcher
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.database.sql.EngineSqlDatabase
import cromwell.database.sql.SqlConverters.OffsetDateTimeToSystemTimestamp
import cromwell.database.sql.tables.GroupMetricsEntry

import java.time.OffsetDateTime

class GroupMetricsActor(engineDbInterface: EngineSqlDatabase) extends Actor with ActorLogging {

implicit val ec: MessageDispatcher = context.system.dispatchers.lookup(Dispatcher.EngineDispatcher)

override def receive: Receive = {
case RecordGroupQuotaExhaustion(group) =>
val groupMetricsEntry = GroupMetricsEntry(group, OffsetDateTime.now.toSystemTimestamp)
engineDbInterface.recordGroupMetricsEntry(groupMetricsEntry)
()
case other =>
log.error(
s"Programmer Error: Unexpected message ${other.toPrettyElidedString(1000)} received by ${this.self.path.name}."
)
}
}

object GroupMetricsActor {

sealed trait GroupMetricsActorMessage

case class RecordGroupQuotaExhaustion(group: String) extends GroupMetricsActorMessage

def props(engineDbInterface: EngineSqlDatabase): Props =
Props(new GroupMetricsActor(engineDbInterface)).withDispatcher(EngineDispatcher)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ import wom.graph.LocalName
import wom.values._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}

import java.io.IOException
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.io.IOException
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
Expand All @@ -68,6 +68,7 @@ case class DefaultStandardAsyncExecutionActorParams(
override val backendInitializationDataOption: Option[BackendInitializationData],
override val backendSingletonActorOption: Option[ActorRef],
override val completionPromise: Promise[BackendJobExecutionResponse],
override val groupMetricsActor: ActorRef,
override val minimumRuntimeSettings: MinimumRuntimeSettings
) extends StandardAsyncExecutionActorParams

Expand Down Expand Up @@ -1057,6 +1058,12 @@ trait StandardAsyncExecutionActor
*/
def retryEvaluateOutputs(exception: Exception): Boolean = false

/**
* Checks if the job has run into any cloud quota exhaustion and records it to GroupMetrics table
* @param runStatus The run status
*/
def checkAndRecordQuotaExhaustion(runStatus: StandardAsyncRunState): Unit = ()

/**
* Process a successful run, as defined by `isSuccess`.
*
Expand Down Expand Up @@ -1329,6 +1336,9 @@ trait StandardAsyncExecutionActor
tellMetadata(Map(CallMetadataKeys.BackendStatus -> state))
}

// record if group has run into cloud quota exhaustion
checkAndRecordQuotaExhaustion(state)

state match {
case _ if isTerminal(state) =>
val metadata = getTerminalMetadata(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ trait StandardJobExecutionActorParams {
/** The singleton actor. */
def backendSingletonActorOption: Option[ActorRef]

/** Singleton actor for recording when hog group runs into quota exhaustion */
def groupMetricsActor: ActorRef

/** The default settings for runtime Environment passed to CWL expressions when not specified in the Resource Requirements */
val minimumRuntimeSettings: MinimumRuntimeSettings
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory {
initializationDataOption: Option[BackendInitializationData],
serviceRegistryActor: ActorRef,
ioActor: ActorRef,
backendSingletonActorOption: Option[ActorRef]
backendSingletonActorOption: Option[ActorRef],
groupMetricsActor: ActorRef
): Props = {
val params = jobExecutionActorParams(jobDescriptor,
initializationDataOption,
serviceRegistryActor,
ioActor,
backendSingletonActorOption
backendSingletonActorOption,
groupMetricsActor
)
Props(new StandardSyncExecutionActor(params)).withDispatcher(Dispatcher.BackendDispatcher)
}
Expand All @@ -126,7 +128,8 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory {
initializationDataOption: Option[BackendInitializationData],
serviceRegistryActor: ActorRef,
ioActor: ActorRef,
backendSingletonActorOption: Option[ActorRef]
backendSingletonActorOption: Option[ActorRef],
groupMetricsActor: ActorRef
): StandardSyncExecutionActorParams =
DefaultStandardSyncExecutionActorParams(
jobIdKey,
Expand All @@ -137,6 +140,7 @@ trait StandardLifecycleActorFactory extends BackendLifecycleActorFactory {
initializationDataOption,
backendSingletonActorOption,
asyncExecutionActorClass,
groupMetricsActor,
MinimumRuntimeSettings()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ case class DefaultStandardSyncExecutionActorParams(
override val backendInitializationDataOption: Option[BackendInitializationData],
override val backendSingletonActorOption: Option[ActorRef],
override val asyncJobExecutionActorClass: Class[_ <: StandardAsyncExecutionActor],
override val groupMetricsActor: ActorRef,
override val minimumRuntimeSettings: MinimumRuntimeSettings
) extends StandardSyncExecutionActorParams

Expand Down Expand Up @@ -150,6 +151,7 @@ class StandardSyncExecutionActor(val standardParams: StandardSyncExecutionActorP
standardParams.backendInitializationDataOption,
standardParams.backendSingletonActorOption,
completionPromise,
standardParams.groupMetricsActor,
standardParams.minimumRuntimeSettings
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cromwell.backend.standard

import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion
import cromwell.database.slick.EngineSlickDatabase
import cromwell.database.sql.tables.GroupMetricsEntry
import cromwell.services.EngineServicesStore
import cromwell.services.ServicesStore.EnhancedSqlDatabase
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.{ExecutionContext, Future}

class GroupMetricsActorSpec extends AnyFlatSpec with Matchers {

implicit val system: ActorSystem = ActorSystem("GroupMetricsActorSpec")

val testHogGroup: String = "groot-hog-group"
val DatabaseConfig: Config = ConfigFactory.load.getConfig("database")
var recordMethodCallCount: Int = 0

def databaseInterface(): EngineSlickDatabase =
new EngineSlickDatabase(DatabaseConfig) {
override def recordGroupMetricsEntry(groupMetricsEntry: GroupMetricsEntry)(implicit
ec: ExecutionContext
): Future[Unit] = {
recordMethodCallCount = recordMethodCallCount + 1
Future.successful(())
}
}.initialized(EngineServicesStore.EngineLiquibaseSettings)

behavior of "GroupMetricsActor"

it should "receive new quota exhaustion message and call database function" in {
val db = databaseInterface()
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db))

mockGroupMetricsActor.tell(RecordGroupQuotaExhaustion(testHogGroup), TestProbe().ref)

eventually {
recordMethodCallCount shouldBe 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class EngineSlickDatabase(originalDatabaseConfig: Config)
with JobStoreSlickDatabase
with CallCachingSlickDatabase
with SubWorkflowStoreSlickDatabase
with DockerHashStoreSlickDatabase {
with DockerHashStoreSlickDatabase
with GroupMetricsSlickDatabase {
override lazy val dataAccess = new EngineDataAccessComponent(slickConfig.profile)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cromwell.database.slick

import cromwell.database.sql.GroupMetricsSqlDatabase
import cromwell.database.sql.tables.GroupMetricsEntry

import scala.concurrent.{ExecutionContext, Future}

trait GroupMetricsSlickDatabase extends GroupMetricsSqlDatabase {
this: EngineSlickDatabase =>

import dataAccess.driver.api._

override def recordGroupMetricsEntry(
groupMetricsEntry: GroupMetricsEntry
)(implicit ec: ExecutionContext): Future[Unit] = {
val action = for {
updateCount <- dataAccess
.quotaExhaustionForGroupId(groupMetricsEntry.groupId)
.update(groupMetricsEntry.quotaExhaustionDetected)
_ <- updateCount match {
case 0 => dataAccess.groupMetricsEntryIdsAutoInc += groupMetricsEntry
case _ => assertUpdateCount("recordGroupMetricsEntry", updateCount, 1)
}
} yield ()
runTransaction(action)
}

override def countGroupMetricsEntries(groupId: String)(implicit ec: ExecutionContext): Future[Int] = {
val action = dataAccess.countGroupMetricsEntriesForGroupId(groupId).result
runTransaction(action)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,19 @@ trait GroupMetricsEntryComponent {

val groupMetricsEntryIdsAutoInc = groupMetricsEntries returning groupMetricsEntries.map(_.groupMetricsEntryId)

val quotaExhaustionForGroupId = Compiled((groupId: Rep[String]) =>
for {
groupMetricsEntry <- groupMetricsEntries
if groupMetricsEntry.groupId === groupId
} yield groupMetricsEntry.quotaExhaustionDetected
)

val countGroupMetricsEntriesForGroupId = Compiled((groupId: Rep[String]) =>
{
for {
groupMetricsEntry <- groupMetricsEntries
if groupMetricsEntry.groupId === groupId
} yield groupMetricsEntry
}.size
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ trait EngineSqlDatabase
with WorkflowStoreSqlDatabase
with SubWorkflowStoreSqlDatabase
with DockerHashStoreSqlDatabase
with GroupMetricsSqlDatabase
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cromwell.database.sql

import cromwell.database.sql.tables.GroupMetricsEntry

import scala.concurrent.{ExecutionContext, Future}

trait GroupMetricsSqlDatabase {

this: SqlDatabase =>

/**
* Insert or update Group Metrics entry to the table
*
*/
def recordGroupMetricsEntry(groupMetricsEntry: GroupMetricsEntry)(implicit ec: ExecutionContext): Future[Unit]

/**
* Returns number of entries associated with given group
*/
def countGroupMetricsEntries(groupId: String)(implicit ec: ExecutionContext): Future[Int]
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ object WorkflowActor {
workflowHeartbeatConfig: WorkflowHeartbeatConfig,
totalJobsByRootWf: AtomicInteger,
fileHashCacheActorProps: Option[Props],
blacklistCache: Option[BlacklistCache]
blacklistCache: Option[BlacklistCache],
groupMetricsActor: ActorRef
): Props =
Props(
new WorkflowActor(
Expand All @@ -247,7 +248,8 @@ object WorkflowActor {
workflowHeartbeatConfig = workflowHeartbeatConfig,
totalJobsByRootWf = totalJobsByRootWf,
fileHashCacheActorProps = fileHashCacheActorProps,
blacklistCache = blacklistCache
blacklistCache = blacklistCache,
groupMetricsActor = groupMetricsActor
)
).withDispatcher(EngineDispatcher)
}
Expand Down Expand Up @@ -279,7 +281,8 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
// child of this actor. The sbt subproject of `RootWorkflowFileHashCacheActor` is not visible from
// the subproject this class belongs to so the `Props` are passed in.
fileHashCacheActorProps: Option[Props],
blacklistCache: Option[BlacklistCache]
blacklistCache: Option[BlacklistCache],
groupMetricsActor: ActorRef
) extends LoggingFSM[WorkflowActorState, WorkflowActorData]
with WorkflowLogging
with WorkflowMetadataHelper
Expand Down Expand Up @@ -450,7 +453,8 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
rootConfig = conf,
totalJobsByRootWf = totalJobsByRootWf,
fileHashCacheActor = fileHashCacheActorProps map context.system.actorOf,
blacklistCache = blacklistCache
blacklistCache = blacklistCache,
groupMetricsActor = groupMetricsActor
),
name = s"WorkflowExecutionActor-$workflowId"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object WorkflowManagerActor {
jobExecutionTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
serverMode: Boolean,
workflowHeartbeatConfig: WorkflowHeartbeatConfig
workflowHeartbeatConfig: WorkflowHeartbeatConfig,
groupMetricsActor: ActorRef
): Props = {
val params = WorkflowManagerActorParams(
config = config,
Expand All @@ -89,7 +90,8 @@ object WorkflowManagerActor {
jobExecutionTokenDispenserActor = jobExecutionTokenDispenserActor,
backendSingletonCollection = backendSingletonCollection,
serverMode = serverMode,
workflowHeartbeatConfig = workflowHeartbeatConfig
workflowHeartbeatConfig = workflowHeartbeatConfig,
groupMetricsActor = groupMetricsActor
)
Props(new WorkflowManagerActor(params)).withDispatcher(EngineDispatcher)
}
Expand Down Expand Up @@ -139,7 +141,8 @@ case class WorkflowManagerActorParams(config: Config,
jobExecutionTokenDispenserActor: ActorRef,
backendSingletonCollection: BackendSingletonCollection,
serverMode: Boolean,
workflowHeartbeatConfig: WorkflowHeartbeatConfig
workflowHeartbeatConfig: WorkflowHeartbeatConfig,
groupMetricsActor: ActorRef
)

class WorkflowManagerActor(params: WorkflowManagerActorParams)
Expand Down Expand Up @@ -351,7 +354,8 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
workflowHeartbeatConfig = params.workflowHeartbeatConfig,
totalJobsByRootWf = new AtomicInteger(),
fileHashCacheActorProps = fileHashCacheActorProps,
blacklistCache = callCachingBlacklistManager.blacklistCacheFor(workflow)
blacklistCache = callCachingBlacklistManager.blacklistCacheFor(workflow),
groupMetricsActor = params.groupMetricsActor
)
val wfActor = context.actorOf(wfProps, name = s"WorkflowActor-$workflowId")

Expand Down
Loading

0 comments on commit 459a6ef

Please sign in to comment.