diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 80fb37f50..eeaa5fd7b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,11 +91,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p semconv/stable/.jvm/target oteljava/metrics/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target + run: mkdir -p semconv/stable/.jvm/target oteljava/metrics/target instrumentation/metrics/js/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target instrumentation/metrics/jvm/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target instrumentation/metrics/native/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar semconv/stable/.jvm/target oteljava/metrics/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target + run: tar cf targets.tar semconv/stable/.jvm/target oteljava/metrics/target instrumentation/metrics/js/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target instrumentation/metrics/jvm/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target instrumentation/metrics/native/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index a0a21f021..26f375122 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import com.typesafe.tools.mima.core._ -ThisBuild / tlBaseVersion := "0.11" +ThisBuild / tlBaseVersion := "0.12" ThisBuild / organization := "org.typelevel" ThisBuild / organizationName := "Typelevel" @@ -76,7 +76,7 @@ ThisBuild / mergifyPrRules ++= Seq( ) val CatsVersion = "2.11.0" -val CatsEffectVersion = "3.5.7" +val CatsEffectVersion = "3.6.0-RC1" val CatsMtlVersion = "1.4.0" val FS2Version = "3.11.0" val MUnitVersion = "1.0.0" @@ -130,6 +130,7 @@ lazy val root = tlCrossRootProject `core-metrics`, `core-trace`, core, + `instrumentation-metrics`, `sdk-common`, `sdk-metrics`, `sdk-metrics-testkit`, @@ -235,6 +236,24 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ) .settings(scalafixSettings) +// +// Instrumentation +// + +lazy val `instrumentation-metrics` = crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Full) + .in(file("instrumentation/metrics")) + .dependsOn(`core-metrics`, `core-common` % "test->test", `sdk-metrics-testkit` % Test) + .settings(munitDependencies) + .settings( + name := "otel4s-instrumentation-metrics", + startYear := Some(2024), + libraryDependencies ++= Seq( + "org.typelevel" %%% "scalacheck-effect-munit" % MUnitScalaCheckEffectVersion % Test + ) + ) + .settings(scalafixSettings) + // // SDK // @@ -820,6 +839,7 @@ lazy val docs = project .dependsOn( oteljava, `oteljava-testkit`, + `instrumentation-metrics`.jvm, sdk.jvm, `sdk-exporter`.jvm, `sdk-exporter-prometheus`.jvm, @@ -889,6 +909,7 @@ lazy val unidocs = project `core-metrics`.jvm, `core-trace`.jvm, core.jvm, + `instrumentation-metrics`.jvm, `sdk-common`.jvm, `sdk-metrics`.jvm, `sdk-metrics-testkit`.jvm, diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala index e90af39d5..069d8b7dc 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala @@ -51,7 +51,7 @@ trait BatchCallback[F[_]] { * }}} * * @param callback - * the callback to to observe values on-demand + * the callback to observe values on-demand * * @param observable * the instrument for which the callback may observe values diff --git a/docs/instrumentation/directory.conf b/docs/instrumentation/directory.conf index 563e85cfe..83c8e3255 100644 --- a/docs/instrumentation/directory.conf +++ b/docs/instrumentation/directory.conf @@ -2,6 +2,7 @@ laika.title = Instrumentation laika.navigationOrder = [ metrics.md + metrics-cats-effect-io-runtime.md tracing.md tracing-cross-service-propagation.md ] diff --git a/docs/instrumentation/metrics-cats-effect-io-runtime.md b/docs/instrumentation/metrics-cats-effect-io-runtime.md new file mode 100644 index 000000000..2b25f4add --- /dev/null +++ b/docs/instrumentation/metrics-cats-effect-io-runtime.md @@ -0,0 +1,573 @@ +# Metrics | Cats Effect IO runtime + +## Available metrics + +```scala mdoc:invisible +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit +import IORuntimeMetrics.Config._ + +def printMetrics(config: IORuntimeMetrics.Config): Unit = { + val metrics = MetricsTestkit.inMemory[IO]().use { testkit => + implicit val mp: MeterProvider[IO] = testkit.meterProvider + + IORuntimeMetrics + .register[IO](global.metrics, config) + .surround(testkit.collectMetrics) + }.unsafeRunSync() + + println("| Name | Description | Unit |") + println("|-|-|-|") + println(metrics.sortBy(_.name).map(m => s"${m.name} | ${m.description.getOrElse("")} | ${m.unit.getOrElse("")}").mkString("\n")) +} +``` + +### CPU Starvation + +**Platforms**: JVM, Scala.js, Scala Native. + +These metrics could help identify performance bottlenecks caused by an overloaded compute pool, +excessive task scheduling, or lack of CPU resources. + +```scala mdoc:passthrough +printMetrics(IORuntimeMetrics.Config(CpuStarvationConfig.enabled, WorkStealingThreadPoolConfig.disabled)) +``` + +### Work-stealing thread pool - compute + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool + +These metrics provide insights about fibers and threads within the compute pool. +They help diagnose load distribution, identify bottlenecks, and monitor the pool’s efficiency in handling tasks. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.disabled, + ) + ) +) +``` + +### Work-stealing thread pool - thread + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the worker is used by +* `worker.index` - the index of the worker thread +* `thread.event` - the thread event + * `parked` - a thread is parked + * `polled` - a thread is polled for I/O events + * `blocked` - a thread is switched to a blocking thread and been replaced + * `respawn` - a thread is replaced by a newly spawned thread + +These metrics provide detailed information about threads state within the compute pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - local queue + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the queue is used by +* `worker.index` - the index of the worker thread the queue is used by + +These metrics provide a detailed view of fiber distribution within the pool. They help diagnose +load imbalances and system inefficiency. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - timer heap + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the timer heap is used by +* `worker.index` - the index of the worker thread the timer heap is used by +* `timer.state` - the state of the timer + * `executed` - the successfully executed timer + * `scheduled` - the scheduled timer + * `canceled` - the canceled timer + +These metrics provide a detailed view of timer stats within the pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - poller + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the poller is used by +* `worker.index` - the index of the worker thread the poller is used by +* `poller.operation` - the operation performed by the poller + * `accept` + * `connect` + * `read` + * `write` +* `poller.operation.status` - the status of the operation + * `submitted` - the operation has been submitted + * `succeeded` - the operation has errored + * `errored` - the operation has errored + * `canceled` - the operation has been canceled + +These metrics provide a detailed view of poller stats within the pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.enabled + ), + ) + ) +) +``` + +## Getting started + +Add the following configuration to the favorite build tool: + +@:select(build-tool) + +@:choice(sbt) + +Add settings to the `build.sbt`: + +```scala +libraryDependencies ++= Seq( + "org.typelevel" %%% "otel4s-instrumentation-metrics" % "@VERSION@" // <1> +) +``` + +@:choice(scala-cli) + +Add directives to the `*.scala` file: + +```scala +//> using dep "org.typelevel::otel4s-instrumentation-metrics::@VERSION@" // <1> +``` + +@:@ + +1. Add the `otel4s-instrumentation-metrics` library + +## Registering metrics collectors + +`IORuntimeMetrics.register` takes care of the metrics lifecycle management. + +@:select(otel-backend) + +@:choice(oteljava) + +```scala mdoc:reset:silent +import cats.effect._ +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.trace.TracerProvider +import org.typelevel.otel4s.oteljava.OtelJava + +object Main extends IOApp.Simple { + + def run: IO[Unit] = + OtelJava.autoConfigured[IO]().use { otel4s => + implicit val mp: MeterProvider[IO] = otel4s.meterProvider + IORuntimeMetrics + .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + .surround { + program(otel4s.meterProvider, otel4s.tracerProvider) + } + } + + def program( + meterProvider: MeterProvider[IO], + tracerProvider: TracerProvider[IO] + ): IO[Unit] = { + val _ = (meterProvider, tracerProvider) + IO.unit + } + +} +``` + +@:choice(sdk) + +```scala mdoc:reset:silent +import cats.effect._ +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.trace.TracerProvider +import org.typelevel.otel4s.sdk.OpenTelemetrySdk + +object Main extends IOApp.Simple { + + def run: IO[Unit] = + OpenTelemetrySdk.autoConfigured[IO]().use { autoConfigured => + val sdk = autoConfigured.sdk + implicit val mp: MeterProvider[IO] = sdk.meterProvider + IORuntimeMetrics + .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + .surround { + program(sdk.meterProvider, sdk.tracerProvider) + } + } + + def program( + meterProvider: MeterProvider[IO], + tracerProvider: TracerProvider[IO] + ): IO[Unit] = { + val _ = (meterProvider, tracerProvider) + IO.unit + } + +} +``` + +@:@ + + +## Customization + +The behavior of the `IORuntimeMetrics.register` can be customized via `IORuntimeMetrics.Config`. + +### CPU Starvation + +```scala mdoc:reset:invisible +import cats.effect.IO +import org.typelevel.otel4s.{Attribute, Attributes} +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider + +val runtime = cats.effect.unsafe.implicits.global +implicit val mp: MeterProvider[IO] = MeterProvider.noop[IO] +``` + +To disable CPU starvation metrics: +```scala mdoc:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, // disable CPU starvation metrics + WorkStealingThreadPoolConfig.enabled + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to CPU starvation metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkStealingThreadPoolConfig.enabled + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - compute + +To disable worker metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.disabled, // disable compute metrics + WorkStealingThreadPoolConfig.WorkerThreadsConfig.enabled + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to compute metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled( + Attributes(Attribute("key", "value")) // attributes + ), + WorkStealingThreadPoolConfig.WorkerThreadsConfig.enabled + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - thread + +To disable thread metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.disabled, // disable worker thread metrics + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to thread metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - local queue + +To disable local queue metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.disabled, // disable local queue metrics + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to local queue metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - timer heap + +To disable timer heap metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, // disable timer heap metrics + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to timer heap metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - poller + +To disable poller metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.disabled // disable poller metrics + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to poller metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ) + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` diff --git a/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala b/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala new file mode 100644 index 000000000..75421fa42 --- /dev/null +++ b/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala @@ -0,0 +1,173 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.{IORuntimeMetrics => CatsIORuntimeMetrics} +import cats.syntax.applicative._ +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider + +private[ce] trait IORuntimeMetricsPlatform { + self: IORuntimeMetrics.type => + + sealed trait Config { + + /** The configuration of the CPU starvation metrics. + */ + def cpuStarvation: Config.CpuStarvationConfig + + override final def toString: String = + Show[Config].show(this) + } + + object Config { + + sealed trait CpuStarvationConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[CpuStarvationConfig].show(this) + } + + object CpuStarvationConfig { + + /** The metrics are enabled. + */ + def enabled: CpuStarvationConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): CpuStarvationConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: CpuStarvationConfig = + Impl(enabled = false, Attributes.empty) + + implicit val cpuStarvationConfigShow: Show[CpuStarvationConfig] = { cfg => + s"CpuStarvationConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends CpuStarvationConfig + } + + /** The default configuration, the following metrics are enabled: + * - CPU starvation + */ + def default: Config = + Impl(CpuStarvationConfig.enabled) + + /** A configuration with the given `cpuStarvation`. + * + * @param cpuStarvation + * the CPU starvation configuration to use + */ + def apply(cpuStarvation: CpuStarvationConfig): Config = + Impl(cpuStarvation) + + implicit val configShow: Show[Config] = { cfg => + s"IORuntimeMetrics.Config{cpuStarvation=${cfg.cpuStarvation}}" + } + + private case class Impl(cpuStarvation: CpuStarvationConfig) extends Config + } + + /** Registers the following collectors depending on the `config`: + * - CPU starvation + * + * @example + * {{{ + * object Main extends IOApp.Simple { + * def program( + * meterProvider: MeterProvider[IO], + * tracerProvider: TracerProvider[IO] + * ): IO[Unit] = ??? + * + * def run: IO[Unit] = + * OpenTelemetrySdk.autoConfigured[IO]().use { autoConfigured => + * val sdk = autoConfigured.sdk + * implicit val mp: MeterProvider[IO] = sdk.meterProvider + * + * IORuntimeMetrics + * .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + * .surround { + * program(sdk.meterProvider, sdk.tracerProvider) + * } + * } + * } + * }}} + * + * =CPU starvation metrics= + * + * Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * To disable CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.disabled // disable CPU starvation metrics + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + */ + def register[F[_]: Sync: MeterProvider]( + metrics: CatsIORuntimeMetrics, + config: Config + ): Resource[F, Unit] = + Resource.eval(MeterProvider[F].get(Const.MeterNamespace)).flatMap { implicit meter => + cpuStarvationMetrics( + metrics.cpuStarvation, + config.cpuStarvation.attributes + ).whenA(config.cpuStarvation.enabled) + } + +} diff --git a/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala b/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala new file mode 100644 index 000000000..d876a48d4 --- /dev/null +++ b/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.IO +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.scalacheck.Arbitraries._ +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +class IORuntimeMetricsSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + import IORuntimeMetrics.Config.CpuStarvationConfig + + test("register metrics using default config") { + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = cpuStarvationMetrics + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, IORuntimeMetrics.Config.default) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + + test("register metrics according to the config") { + PropF.forAllF { (config: IORuntimeMetrics.Config) => + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = List( + config.cpuStarvation.enabled -> cpuStarvationMetrics, + ).collect { case (true, metrics) => metrics }.flatten + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, config) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + } + + test("Show[IORuntimeMetrics.Config]") { + Prop.forAll { (config: IORuntimeMetrics.Config) => + val cpuStarvation = config.cpuStarvation + + val expected = "IORuntimeMetrics.Config{" + + s"cpuStarvation=CpuStarvationConfig{enabled=${cpuStarvation.enabled}, attributes=${cpuStarvation.attributes}}" + + "}" + + assertEquals(Show[IORuntimeMetrics.Config].show(config), expected) + assertEquals(config.toString, expected) + } + } + + private case class Metric(name: String, description: Option[String], unit: Option[String]) + + private def toMetric(metric: MetricData): Metric = + Metric(metric.name, metric.description, metric.unit) + + private val cpuStarvationMetrics = List( + Metric( + "cats.effect.runtime.cpu.starvation.count", + Some("The number of CPU starvation events."), + None + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.current", + Some("The current CPU drift in milliseconds."), + Some("ms") + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.max", + Some("The max CPU drift in milliseconds."), + Some("ms") + ) + ) + + private implicit val cpuStarvationArbitrary: Arbitrary[CpuStarvationConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) CpuStarvationConfig.enabled(attributes) else CpuStarvationConfig.disabled + ) + + private implicit val configArbitrary: Arbitrary[IORuntimeMetrics.Config] = + Arbitrary( + for { + cpuStarvation <- Arbitrary.arbitrary[CpuStarvationConfig] + } yield IORuntimeMetrics.Config(cpuStarvation) + ) + +} diff --git a/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala b/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala new file mode 100644 index 000000000..43d422a83 --- /dev/null +++ b/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala @@ -0,0 +1,1183 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.{IORuntimeMetrics => CatsIORuntimeMetrics} +import cats.effect.unsafe.metrics.WorkStealingPoolMetrics +import cats.effect.unsafe.metrics.WorkerThreadMetrics +import cats.syntax.applicative._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.metrics.MeterProvider + +private[ce] trait IORuntimeMetricsPlatform { + self: IORuntimeMetrics.type => + + sealed trait Config { + + /** The configuration of the CPU starvation metrics. + */ + def cpuStarvation: Config.CpuStarvationConfig + + /** The configuration of the work-stealing thread pool (WSTP) metrics. + */ + def workStealingThreadPool: Config.WorkStealingThreadPoolConfig + + override final def toString: String = + Show[Config].show(this) + } + + object Config { + + sealed trait CpuStarvationConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[CpuStarvationConfig].show(this) + } + + object CpuStarvationConfig { + + /** The metrics are enabled. + */ + def enabled: CpuStarvationConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): CpuStarvationConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: CpuStarvationConfig = + Impl(enabled = false, Attributes.empty) + + implicit val cpuStarvationConfigShow: Show[CpuStarvationConfig] = { cfg => + s"CpuStarvationConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends CpuStarvationConfig + } + + sealed trait WorkStealingThreadPoolConfig { + + /** The configuration of the pool metrics. + */ + def compute: WorkStealingThreadPoolConfig.ComputeConfig + + /** The configuration of the worker thread metrics. + */ + def workerThreads: WorkStealingThreadPoolConfig.WorkerThreadsConfig + + override final def toString: String = + Show[WorkStealingThreadPoolConfig].show(this) + } + + object WorkStealingThreadPoolConfig { + sealed trait ComputeConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[ComputeConfig].show(this) + } + + object ComputeConfig { + + /** The metrics are enabled. + */ + def enabled: ComputeConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): ComputeConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: ComputeConfig = + Impl(enabled = false, Attributes.empty) + + implicit val computeConfigShow: Show[ComputeConfig] = { cfg => + s"ComputeConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends ComputeConfig + } + + sealed trait WorkerThreadsConfig { + + /** The configuration of the worker thread metrics. + */ + def thread: WorkerThreadsConfig.ThreadConfig + + /** The configuration of the local queue metrics. + */ + def localQueue: WorkerThreadsConfig.LocalQueueConfig + + /** The configuration of the timer heap metrics. + */ + def timerHeap: WorkerThreadsConfig.TimerHeapConfig + + /** The configuration of the ploler metrics. + */ + def poller: WorkerThreadsConfig.PollerConfig + + override final def toString: String = + Show[WorkerThreadsConfig].show(this) + } + + object WorkerThreadsConfig { + + sealed trait ThreadConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[ThreadConfig].show(this) + } + + object ThreadConfig { + + /** The metrics are enabled. + */ + def enabled: ThreadConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): ThreadConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: ThreadConfig = + Impl(enabled = false, Attributes.empty) + + implicit val threadConfigShow: Show[ThreadConfig] = { cfg => + s"ThreadConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends ThreadConfig + + } + + sealed trait LocalQueueConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[LocalQueueConfig].show(this) + } + + object LocalQueueConfig { + + /** The metrics are enabled. + */ + def enabled: LocalQueueConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): LocalQueueConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: LocalQueueConfig = + Impl(enabled = false, Attributes.empty) + + implicit val localQueueConfigShow: Show[LocalQueueConfig] = { cfg => + s"LocalQueueConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends LocalQueueConfig + } + + sealed trait TimerHeapConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[TimerHeapConfig].show(this) + } + + object TimerHeapConfig { + + /** The metrics are enabled. + */ + def enabled: TimerHeapConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): TimerHeapConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: TimerHeapConfig = + Impl(enabled = false, Attributes.empty) + + implicit val timerHeapConfigShow: Show[TimerHeapConfig] = { cfg => + s"TimerHeapConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends TimerHeapConfig + } + + sealed trait PollerConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[PollerConfig].show(this) + } + + object PollerConfig { + + /** The metrics are enabled. + */ + def enabled: PollerConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): PollerConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: PollerConfig = + Impl(enabled = false, Attributes.empty) + + implicit val pollerConfigShow: Show[PollerConfig] = { cfg => + s"PollerConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends PollerConfig + } + + /** A configuration with the given configs. + * + * @param thread + * the worker configuration to use + * + * @param localQueue + * the local queue configuration to use + * + * @param timerHeap + * the timer heap configuration to use + * + * @param poller + * the poller configuration to use + */ + def apply( + thread: ThreadConfig, + localQueue: LocalQueueConfig, + timerHeap: TimerHeapConfig, + poller: PollerConfig + ): WorkerThreadsConfig = + Impl(thread, localQueue, timerHeap, poller) + + /** All metrics (worker, local queue, timer heap, poller) are enabled. + */ + def enabled: WorkerThreadsConfig = + Impl(ThreadConfig.enabled, LocalQueueConfig.enabled, TimerHeapConfig.enabled, PollerConfig.enabled) + + /** All metrics (worker, local queue, timer heap, poller) are disabled. + */ + def disabled: WorkerThreadsConfig = + Impl(ThreadConfig.disabled, LocalQueueConfig.disabled, TimerHeapConfig.disabled, PollerConfig.disabled) + + implicit val workerThreadsConfigShow: Show[WorkerThreadsConfig] = { cfg => + "WorkerThreadsConfig{" + + s"thread=${cfg.thread}, " + + s"localQueue=${cfg.localQueue}, " + + s"timerHeap=${cfg.timerHeap}, " + + s"poller=${cfg.poller}}" + } + + private case class Impl( + thread: ThreadConfig, + localQueue: LocalQueueConfig, + timerHeap: TimerHeapConfig, + poller: PollerConfig + ) extends WorkerThreadsConfig + } + + /** A configuration with the given `compute` and `workerThreads` configurations. + * + * @param compute + * the compute configuration to use + * + * @param workerThreads + * the worker threads configuration to use + */ + def apply(compute: ComputeConfig, workerThreads: WorkerThreadsConfig): WorkStealingThreadPoolConfig = + Impl(compute, workerThreads) + + /** All metrics (pool, worker threads) are enabled. + */ + def enabled: WorkStealingThreadPoolConfig = + Impl(ComputeConfig.enabled, WorkerThreadsConfig.enabled) + + /** All metrics (pool, worker threads) are disabled. + */ + def disabled: WorkStealingThreadPoolConfig = + Impl(ComputeConfig.disabled, WorkerThreadsConfig.disabled) + + implicit val workStealingThreadPoolConfigShow: Show[WorkStealingThreadPoolConfig] = { cfg => + s"WorkStealingThreadPoolConfig{compute=${cfg.compute}, workerThreads=${cfg.workerThreads}}" + } + + private case class Impl( + compute: ComputeConfig, + workerThreads: WorkerThreadsConfig + ) extends WorkStealingThreadPoolConfig + } + + /** The default configuration, the following metrics are enabled: + * - CPU starvation + * - Work-stealing thread pool - worker + * - Work-stealing thread pool - local queue + * - Work-stealing thread pool - timer heap + * - Work-stealing thread pool - poller + */ + def default: Config = + Config(CpuStarvationConfig.enabled, WorkStealingThreadPoolConfig.enabled) + + /** A configuration with the given `cpuStarvation` and `workStealingThreadPool`. + * + * @param cpuStarvation + * the CPU starvation configuration to use + * + * @param workStealingThreadPool + * the work stealing thread pool configuration to use + */ + def apply(cpuStarvation: CpuStarvationConfig, workStealingThreadPool: WorkStealingThreadPoolConfig): Config = + Impl(cpuStarvation, workStealingThreadPool) + + implicit val configShow: Show[Config] = { cfg => + s"IORuntimeMetrics.Config{cpuStarvation=${cfg.cpuStarvation}, workStealingThreadPool=${cfg.workStealingThreadPool}}" + } + + private case class Impl( + cpuStarvation: CpuStarvationConfig, + workStealingThreadPool: WorkStealingThreadPoolConfig + ) extends Config + } + + /** Registers the following collectors depending on the `config`: + * - runtime pool metrics + * - runtime worker thread metrics + * - runtime local queue metrics + * - runtime timer heap metrics + * - runtime poller metrics + * - CPU starvation + * + * By default, all metrics are enabled. + * + * =CPU starvation metrics= + * + * Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * To disable CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.disabled, // disable CPU starvation metrics + * WorkStealingThreadPoolConfig.enabled + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkStealingThreadPoolConfig.enabled + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * =WSTP metrics= + * + * ==Compute metrics== + * + * Registers the runtime compute metrics: + * - `cats.effect.runtime.wstp.compute.thread.count` + * - `cats.effect.runtime.wstp.compute.thread.active.count` + * - `cats.effect.runtime.wstp.compute.thread.blocked.count` + * - `cats.effect.runtime.wstp.compute.thread.searching.count` + * - `cats.effect.runtime.wstp.compute.fiber.enqueued.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool + * + * To disable WSTP pool metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.disabled, // disable compute metrics + * WorkerThreadsConfig.enabled + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP pool metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.enabled + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Worker thread metrics== + * + * Registers the runtime worker metrics: + * - `cats.effect.runtime.wstp.worker.thread.idle.duration` + * - `cats.effect.runtime.wstp.worker.thread.event.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the queue is used by + * - `worker.index` - the index of the worker + * - `thread.event` - the thread event + * - `parked` - a thread is parked + * - `polled` - a thread is polled for I/O events + * - `blocked` - a thread is switched to a blocking thread and been replaced + * - `respawn` - a thread is replaced by a newly spawned thread + * + * To disable WSTP worker thread metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.disabled, // disable worker thread metrics + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP worker thread metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Local queue metrics== + * + * Registers the runtime local queue metrics: + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.enqueued.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.spillover.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.steal.attempt.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.stolen.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the queue is used by + * - `worker.index` - the index of the worker the queue is used by + * + * To disable WSTP local queue metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.disabled, // disable local queue metrics + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP local queue metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Timer heap metrics== + * + * Registers the runtime time heap metrics: + * - `cats.effect.runtime.wstp.worker.timerheap.outstanding.count` + * - `cats.effect.runtime.wstp.worker.timerheap.packed.count` + * - `cats.effect.runtime.wstp.worker.timerheap.timer.count` + * - `cats.effect.runtime.wstp.worker.timerheap.next.due` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the time heap is used by + * - `worker.index` - the index of the worker the timer heap is used by + * - `timer.state` - the state of the timer + * - `executed` - the successfully executed timer + * - `scheduled` - the scheduled timer + * - `canceled` - the canceled timer + * + * To disable WSTP timer heap metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, // disable timer heap metrics + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP timer heap metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Poller metrics== + * + * Registers the runtime poller metrics: + * - `cats.effect.runtime.wstp.worker.poller.operation.outstanding.count` + * - `cats.effect.runtime.wstp.worker.poller.operation.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the poller is used by + * - `worker.index` - the index of the worker thread the poller is used by + * - `poller.operation` - the operation performed by the poller + * - `accept` + * - `connect` + * - `read` + * - `write` + * - `poller.operation.status` - the status of the operation + * - `submitted` - the operation has been submitted + * - `succeeded` - the operation has errored + * - `errored` - the operation has errored + * - `canceled` - the operation has been canceled + * + * To disable WSTP poller metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.disabled // disable poller metrics + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP poller metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ) + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * @example + * {{{ + * object Main extends IOApp.Simple { + * def program( + * meterProvider: MeterProvider[IO], + * tracerProvider: TracerProvider[IO] + * ): IO[Unit] = ??? + * + * def run: IO[Unit] = + * OtelJava.autoConfigured[IO]().use { otel4s => + * implicit val mp: MeterProvider[IO] = otel4s.meterProvider + * + * IORuntimeMetrics + * .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + * .surround { + * program(otel4s.meterProvider, otel4s.tracerProvider) + * } + * } + * } + * }}} + */ + def register[F[_]: Sync: MeterProvider]( + metrics: CatsIORuntimeMetrics, + config: Config + ): Resource[F, Unit] = + Resource.eval(MeterProvider[F].get(Const.MeterNamespace)).flatMap { implicit meter => + val wstpMetrics = metrics.workStealingThreadPool match { + case Some(pool) => + val poolId = pool.identifier + + val computeConfig = config.workStealingThreadPool.compute + val workerThreadsConfig = config.workStealingThreadPool.workerThreads + + for { + _ <- computeMetrics(poolId, pool, computeConfig.attributes) + .whenA(computeConfig.enabled) + + _ <- threadMetrics(poolId, pool.workerThreads, workerThreadsConfig.thread.attributes) + .whenA(workerThreadsConfig.thread.enabled) + + _ <- localQueueMetrics(poolId, pool.workerThreads, workerThreadsConfig.localQueue.attributes) + .whenA(workerThreadsConfig.localQueue.enabled) + + _ <- timerHeapMetrics(poolId, pool.workerThreads, workerThreadsConfig.timerHeap.attributes) + .whenA(workerThreadsConfig.timerHeap.enabled) + + _ <- pollerMetrics(poolId, pool.workerThreads, workerThreadsConfig.poller.attributes) + .whenA(workerThreadsConfig.poller.enabled) + } yield () + + case None => + Resource.unit[F] + } + + for { + _ <- cpuStarvationMetrics(metrics.cpuStarvation, config.cpuStarvation.attributes) + .whenA(config.cpuStarvation.enabled) + + _ <- wstpMetrics + } yield () + } + + private def computeMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: WorkStealingPoolMetrics, + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.compute" + + Meter[F].batchCallback.of( + Meter[F] + .observableGauge[Long](s"$prefix.thread.count") + .withDescription( + "The number of worker thread instances backing the work-stealing thread pool (WSTP)." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.active.count") + .withDescription( + "The number of active worker thread instances currently executing fibers on the compute thread pool." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.searching.count") + .withDescription( + "The number of worker thread instances currently searching for fibers to steal from other worker threads." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.blocked.count") + .withDescription( + "The number of worker thread instances that can run blocking actions on the compute thread pool." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.fiber.enqueued.count") + .withDescription("The total number of fibers enqueued on all local queues.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.fiber.suspended.count") + .withDescription("The number of fibers which are currently asynchronously suspended.") + .withUnit("{fiber}") + .createObserver + ) { (total, active, searching, blocked, enqueued, suspended) => + val attributes = Attributes(Attribute("pool.id", poolId)) ++ extraAttributes + + for { + snapshot <- Sync[F].delay( + ( + metrics.workerThreadCount(), + metrics.activeThreadCount(), + metrics.searchingThreadCount(), + metrics.blockedWorkerThreadCount(), + metrics.localQueueFiberCount(), + metrics.suspendedFiberCount(), + ) + ) + _ <- total.record(snapshot._1, attributes) + _ <- active.record(snapshot._2, attributes) + _ <- searching.record(snapshot._3, attributes) + _ <- blocked.record(snapshot._4, attributes) + _ <- enqueued.record(snapshot._5, attributes) + _ <- suspended.record(snapshot._6, attributes) + } yield () + } + } + + private def threadMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.thread" + + Meter[F].batchCallback.of( + Meter[F] + .observableCounter[Long](s"$prefix.idle.duration") + .withDescription("The total amount of time in nanoseconds that this WorkerThread has been idle.") + .withUnit("ns") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.event.count") + .withDescription("The total number of events that happened to this WorkerThread.") + .withUnit("{event}") + .createObserver, + ) { (idleDuration, eventCount) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + def recordCount(value: Long, state: String): F[Unit] = + eventCount.record( + value, + attributes ++ Attributes(Attribute("thread.event", state)) + ) + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.idleTime(), + workerMetrics.parkedCount(), + workerMetrics.polledCount(), + workerMetrics.blockingCount(), + workerMetrics.respawnCount() + ) + ) + _ <- idleDuration.record(snapshot._1, attributes) + _ <- recordCount(snapshot._2, "parked") + _ <- recordCount(snapshot._3, "polled") + _ <- recordCount(snapshot._4, "blocked") + _ <- recordCount(snapshot._5, "respawn") + } yield () + } + } + } + + private def localQueueMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.localqueue" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.fiber.enqueued.count") + .withDescription("The current number of enqueued fibers.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.count") + .withDescription( + "The total number of fibers enqueued during the lifetime of the local queue." + ) + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.spillover.count") + .withDescription("The total number of fibers spilt over to the external queue.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.steal_attempt.count") + .withDescription("The total number of successful steal attempts by other worker threads.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.stolen.count") + .withDescription("The total number of stolen fibers by other worker threads.") + .withUnit("{fiber}") + .createObserver + ) { (fiberEnqueued, fiberTotal, fiberSpillover, stealAttemptCount, stolenCount) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.localQueue.fiberCount(), + workerMetrics.localQueue.totalFiberCount(), + workerMetrics.localQueue.totalSpilloverCount(), + workerMetrics.localQueue.successfulStealAttemptCount(), + workerMetrics.localQueue.stolenFiberCount() + ) + ) + _ <- fiberEnqueued.record(snapshot._1, attributes) + _ <- fiberTotal.record(snapshot._2, attributes) + _ <- fiberSpillover.record(snapshot._3, attributes) + _ <- stealAttemptCount.record(snapshot._4, attributes) + _ <- stolenCount.record(snapshot._5, attributes) + } yield () + } + } + } + + private def timerHeapMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.timerheap" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.outstanding.count") + .withDescription("The current number of the outstanding timers, that remain to be executed.") + .withUnit("{timer}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.timer.count") + .withDescription("The total number of the timers per state.") + .withUnit("{timer}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.packed.count") + .withDescription("The total number of times the heap packed itself to remove canceled timers.") + .withUnit("{event}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.next.due") + .withDescription("Returns the time in nanoseconds till the next due to fire.") + .withUnit("ns") + .createObserver + ) { (outstanding, timerCount, packedCount, nextDue) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + def recordCount(value: Long, state: String): F[Unit] = + timerCount.record( + value, + attributes ++ Attributes(Attribute("timer.state", state)) + ) + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.timerHeap.timersOutstandingCount(), + workerMetrics.timerHeap.totalTimersExecutedCount(), + workerMetrics.timerHeap.totalTimersScheduledCount(), + workerMetrics.timerHeap.totalTimersCanceledCount(), + workerMetrics.timerHeap.packCount(), + workerMetrics.timerHeap.nextTimerDue(), + ) + ) + _ <- outstanding.record(snapshot._1, attributes) + _ <- recordCount(snapshot._2, "executed") + _ <- recordCount(snapshot._3, "scheduled") + _ <- recordCount(snapshot._4, "canceled") + _ <- packedCount.record(snapshot._5, attributes) + _ <- nextDue.record(snapshot._6.getOrElse(0L), attributes) + } yield () + } + } + } + + private def pollerMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.poller" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.operation.outstanding.count") + .withDescription("The current number of outstanding operations per category and outcome.") + .withUnit("{operation}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.operation.count") + .withDescription("The total number of the operations per category and outcome.") + .withUnit("{operation}") + .createObserver + ) { (operationCount, operationOutstanding) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + val poller = workerMetrics.poller + + def recordOutstanding(value: Long, operation: String): F[Unit] = + operationOutstanding.record( + value, + attributes ++ Attributes(Attribute("poller.operation", operation)) + ) + + def recordTotal(value: Long, operation: String, status: String): F[Unit] = + operationCount.record( + value, + attributes ++ Attributes( + Attribute("poller.operation", operation), + Attribute("poller.operation.status", status) + ) + ) + + Sync[F].defer { + for { + _ <- recordOutstanding(poller.totalAcceptOperationsSubmittedCount(), "accept") + _ <- recordTotal(poller.totalAcceptOperationsSubmittedCount(), "accept", "submitted") + _ <- recordTotal(poller.totalAcceptOperationsSucceededCount(), "accept", "succeeded") + _ <- recordTotal(poller.totalAcceptOperationsErroredCount(), "accept", "errored") + _ <- recordTotal(poller.totalAcceptOperationsCanceledCount(), "accept", "canceled") + + _ <- recordOutstanding(poller.totalConnectOperationsSubmittedCount(), "connect") + _ <- recordTotal(poller.totalConnectOperationsSubmittedCount(), "connect", "submitted") + _ <- recordTotal(poller.totalConnectOperationsSucceededCount(), "connect", "succeeded") + _ <- recordTotal(poller.totalConnectOperationsErroredCount(), "connect", "errored") + _ <- recordTotal(poller.totalConnectOperationsCanceledCount(), "connect", "canceled") + + _ <- recordOutstanding(poller.totalReadOperationsSubmittedCount(), "read") + _ <- recordTotal(poller.totalReadOperationsSubmittedCount(), "read", "submitted") + _ <- recordTotal(poller.totalReadOperationsSucceededCount(), "read", "succeeded") + _ <- recordTotal(poller.totalReadOperationsErroredCount(), "read", "errored") + _ <- recordTotal(poller.totalReadOperationsCanceledCount(), "read", "canceled") + + _ <- recordOutstanding(poller.totalWriteOperationsSubmittedCount(), "write") + _ <- recordTotal(poller.totalWriteOperationsSubmittedCount(), "write", "submitted") + _ <- recordTotal(poller.totalWriteOperationsSucceededCount(), "write", "succeeded") + _ <- recordTotal(poller.totalWriteOperationsErroredCount(), "write", "errored") + _ <- recordTotal(poller.totalWriteOperationsCanceledCount(), "write", "canceled") + } yield () + } + } + } + } + +} diff --git a/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala b/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala new file mode 100644 index 000000000..640098445 --- /dev/null +++ b/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala @@ -0,0 +1,297 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.IO +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.scalacheck.Arbitraries._ +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +class IORuntimeMetricsSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + import IORuntimeMetrics.Config.{CpuStarvationConfig, WorkStealingThreadPoolConfig} + import IORuntimeMetrics.Config.WorkStealingThreadPoolConfig.{ComputeConfig, WorkerThreadsConfig} + import IORuntimeMetrics.Config.WorkStealingThreadPoolConfig.WorkerThreadsConfig._ + + test("register metrics using default config") { + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = + cpuStarvationMetrics ++ computeMetrics ++ threadMetrics ++ localQueueMetrics ++ timerHeapMetrics ++ pollerMetrics + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, IORuntimeMetrics.Config.default) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + + test("register metrics according to the config") { + PropF.forAllF { (config: IORuntimeMetrics.Config) => + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = List( + config.cpuStarvation.enabled -> cpuStarvationMetrics, + config.workStealingThreadPool.compute.enabled -> computeMetrics, + config.workStealingThreadPool.workerThreads.thread.enabled -> threadMetrics, + config.workStealingThreadPool.workerThreads.localQueue.enabled -> localQueueMetrics, + config.workStealingThreadPool.workerThreads.timerHeap.enabled -> timerHeapMetrics, + config.workStealingThreadPool.workerThreads.poller.enabled -> pollerMetrics + ).collect { case (true, metrics) => metrics }.flatten + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, config) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + } + + test("Show[IORuntimeMetrics.Config]") { + Prop.forAll { (config: IORuntimeMetrics.Config) => + val cpuStarvation = config.cpuStarvation + val compute = config.workStealingThreadPool.compute + val workerThreads = config.workStealingThreadPool.workerThreads + + val expected = "IORuntimeMetrics.Config{" + + s"cpuStarvation=CpuStarvationConfig{enabled=${cpuStarvation.enabled}, attributes=${cpuStarvation.attributes}}, " + + "workStealingThreadPool=WorkStealingThreadPoolConfig{" + + s"compute=ComputeConfig{enabled=${compute.enabled}, attributes=${compute.attributes}}, " + + s"workerThreads=WorkerThreadsConfig{" + + s"thread=ThreadConfig{enabled=${workerThreads.thread.enabled}, attributes=${workerThreads.thread.attributes}}, " + + s"localQueue=LocalQueueConfig{enabled=${workerThreads.localQueue.enabled}, attributes=${workerThreads.localQueue.attributes}}, " + + s"timerHeap=TimerHeapConfig{enabled=${workerThreads.timerHeap.enabled}, attributes=${workerThreads.timerHeap.attributes}}, " + + s"poller=PollerConfig{enabled=${workerThreads.poller.enabled}, attributes=${workerThreads.poller.attributes}}}}" + + "}" + + assertEquals(Show[IORuntimeMetrics.Config].show(config), expected) + assertEquals(config.toString, expected) + } + } + + private case class Metric(name: String, description: Option[String], unit: Option[String]) + + private def toMetric(metric: MetricData): Metric = + Metric(metric.name, metric.description, metric.unit) + + private val cpuStarvationMetrics = List( + Metric( + "cats.effect.runtime.cpu.starvation.count", + Some("The number of CPU starvation events."), + None + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.current", + Some("The current CPU drift in milliseconds."), + Some("ms") + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.max", + Some("The max CPU drift in milliseconds."), + Some("ms") + ) + ) + + private val computeMetrics = List( + Metric( + "cats.effect.runtime.wstp.compute.thread.count", + Some("The number of worker thread instances backing the work-stealing thread pool (WSTP)."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.active.count", + Some("The number of active worker thread instances currently executing fibers on the compute thread pool."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.searching.count", + Some("The number of worker thread instances currently searching for fibers to steal from other worker threads."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.blocked.count", + Some("The number of worker thread instances that can run blocking actions on the compute thread pool."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.fiber.enqueued.count", + Some("The total number of fibers enqueued on all local queues."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.compute.fiber.suspended.count", + Some("The number of fibers which are currently asynchronously suspended."), + Some("{fiber}") + ) + ) + + private val threadMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.thread.idle.duration", + Some("The total amount of time in nanoseconds that this WorkerThread has been idle."), + Some("ns") + ), + Metric( + "cats.effect.runtime.wstp.worker.thread.event.count", + Some("The total number of events that happened to this WorkerThread."), + Some("{event}") + ), + ) + + private val localQueueMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.count", + Some("The total number of fibers enqueued during the lifetime of the local queue."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.enqueued.count", + Some("The current number of enqueued fibers."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.spillover.count", + Some("The total number of fibers spilt over to the external queue."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.steal_attempt.count", + Some("The total number of successful steal attempts by other worker threads."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.stolen.count", + Some("The total number of stolen fibers by other worker threads."), + Some("{fiber}") + ), + ) + + private val timerHeapMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.timerheap.outstanding.count", + Some("The current number of the outstanding timers, that remain to be executed."), + Some("{timer}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.timer.count", + Some("The total number of the timers per state."), + Some("{timer}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.packed.count", + Some("The total number of times the heap packed itself to remove canceled timers."), + Some("{event}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.next.due", + Some("Returns the time in nanoseconds till the next due to fire."), + Some("ns") + ), + ) + + private val pollerMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.poller.operation.outstanding.count", + Some("The current number of outstanding operations per category and outcome."), + Some("{operation}") + ), + Metric( + "cats.effect.runtime.wstp.worker.poller.operation.count", + Some("The total number of the operations per category and outcome."), + Some("{operation}") + ) + ) + + private implicit val cpuStarvationConfigArbitrary: Arbitrary[CpuStarvationConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) CpuStarvationConfig.enabled(attributes) else CpuStarvationConfig.disabled + ) + + private implicit val computeConfigArbitrary: Arbitrary[ComputeConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) ComputeConfig.enabled(attributes) else ComputeConfig.disabled + ) + + private implicit val localQueueConfigArbitrary: Arbitrary[LocalQueueConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) LocalQueueConfig.enabled(attributes) else LocalQueueConfig.disabled + ) + + private implicit val threadConfigArbitrary: Arbitrary[ThreadConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) ThreadConfig.enabled(attributes) else ThreadConfig.disabled + ) + + private implicit val timerHeapConfigArbitrary: Arbitrary[TimerHeapConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) TimerHeapConfig.enabled(attributes) else TimerHeapConfig.disabled + ) + + private implicit val pollerConfigArbitrary: Arbitrary[PollerConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) PollerConfig.enabled(attributes) else PollerConfig.disabled + ) + + private implicit val workerThreadsConfigArbitrary: Arbitrary[WorkerThreadsConfig] = + Arbitrary( + for { + thread <- Arbitrary.arbitrary[ThreadConfig] + localQueue <- Arbitrary.arbitrary[LocalQueueConfig] + timerHeap <- Arbitrary.arbitrary[TimerHeapConfig] + poller <- Arbitrary.arbitrary[PollerConfig] + } yield WorkerThreadsConfig(thread, localQueue, timerHeap, poller) + ) + + private implicit val configArbitrary: Arbitrary[IORuntimeMetrics.Config] = + Arbitrary( + for { + cpuStarvation <- Arbitrary.arbitrary[CpuStarvationConfig] + pool <- Arbitrary.arbitrary[ComputeConfig] + workerThreads <- Arbitrary.arbitrary[WorkerThreadsConfig] + } yield IORuntimeMetrics.Config(cpuStarvation, WorkStealingThreadPoolConfig(pool, workerThreads)) + ) + +} diff --git a/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala b/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala new file mode 100644 index 000000000..3560a2bc3 --- /dev/null +++ b/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.CpuStarvationMetrics +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter + +object IORuntimeMetrics extends IORuntimeMetricsPlatform { + + protected object Const { + val MeterNamespace = "cats.effect.runtime" + } + + /** Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * @param attributes + * the attributes to attach to the metrics + */ + protected def cpuStarvationMetrics[F[_]: Sync: Meter]( + metrics: CpuStarvationMetrics, + attributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.cpu.starvation" + + Meter[F].batchCallback.of( + Meter[F] + .observableCounter[Long](s"$prefix.count") + .withDescription("The number of CPU starvation events.") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.clock.drift.current") + .withDescription("The current CPU drift in milliseconds.") + .withUnit("ms") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.clock.drift.max") + .withDescription("The max CPU drift in milliseconds.") + .withUnit("ms") + .createObserver, + ) { (count, driftCurrent, driftMax) => + for { + snapshot <- Sync[F].delay( + ( + metrics.starvationCount(), + metrics.clockDriftCurrent(), + metrics.clockDriftMax() + ) + ) + _ <- count.record(snapshot._1, attributes) + _ <- driftCurrent.record(snapshot._2.toMillis, attributes) + _ <- driftMax.record(snapshot._3.toMillis, attributes) + } yield () + } + } + +} diff --git a/sdk/common/shared/src/main/scala/org/typelevel/otel4s/sdk/autoconfigure/AutoConfigure.scala b/sdk/common/shared/src/main/scala/org/typelevel/otel4s/sdk/autoconfigure/AutoConfigure.scala index 093a95099..4ff0c1eb3 100644 --- a/sdk/common/shared/src/main/scala/org/typelevel/otel4s/sdk/autoconfigure/AutoConfigure.scala +++ b/sdk/common/shared/src/main/scala/org/typelevel/otel4s/sdk/autoconfigure/AutoConfigure.scala @@ -16,7 +16,7 @@ package org.typelevel.otel4s.sdk.autoconfigure -import cats.MonadThrow +import cats.effect.MonadCancelThrow import cats.effect.Resource import cats.syntax.monadError._ @@ -89,7 +89,7 @@ object AutoConfigure { * @tparam A * the type of the component */ - abstract class WithHint[F[_]: MonadThrow, A]( + abstract class WithHint[F[_]: MonadCancelThrow, A]( hint: String, configKeys: Set[Config.Key[_]] ) extends AutoConfigure[F, A] { diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/ExemplarFilterAutoConfigure.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/ExemplarFilterAutoConfigure.scala index 2c0eda1c6..4f4e91684 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/ExemplarFilterAutoConfigure.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/ExemplarFilterAutoConfigure.scala @@ -16,7 +16,7 @@ package org.typelevel.otel4s.sdk.metrics.autoconfigure -import cats.MonadThrow +import cats.effect.MonadCancelThrow import cats.effect.Resource import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure import org.typelevel.otel4s.sdk.autoconfigure.Config @@ -41,7 +41,7 @@ import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#exemplars]] */ -private final class ExemplarFilterAutoConfigure[F[_]: MonadThrow]( +private final class ExemplarFilterAutoConfigure[F[_]: MonadCancelThrow]( lookup: TraceContextLookup ) extends AutoConfigure.WithHint[F, ExemplarFilter]( "ExemplarFilter", @@ -118,7 +118,7 @@ private[sdk] object ExemplarFilterAutoConfigure { * @param traceContextLookup * used by the exemplar reservoir to extract tracing information from the context */ - def apply[F[_]: MonadThrow]( + def apply[F[_]: MonadCancelThrow]( traceContextLookup: TraceContextLookup ): AutoConfigure[F, ExemplarFilter] = new ExemplarFilterAutoConfigure[F](traceContextLookup) diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/MetricExportersAutoConfigure.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/MetricExportersAutoConfigure.scala index 26283d522..e83f34dc8 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/MetricExportersAutoConfigure.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/autoconfigure/MetricExportersAutoConfigure.scala @@ -16,8 +16,8 @@ package org.typelevel.otel4s.sdk.metrics.autoconfigure -import cats.MonadThrow import cats.data.NonEmptyList +import cats.effect.MonadCancelThrow import cats.effect.Resource import cats.effect.std.Console import cats.syntax.applicative._ @@ -41,7 +41,7 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#metric-exporters]] */ -private final class MetricExportersAutoConfigure[F[_]: MonadThrow: Console]( +private final class MetricExportersAutoConfigure[F[_]: MonadCancelThrow: Console]( extra: Set[AutoConfigure.Named[F, MetricExporter[F]]] ) extends AutoConfigure.WithHint[F, Map[String, MetricExporter[F]]]( "MetricExporters", @@ -64,7 +64,7 @@ private final class MetricExportersAutoConfigure[F[_]: MonadThrow: Console]( config: Config ): Resource[F, Map[String, MetricExporter[F]]] = { val values = config.getOrElse(ConfigKeys.Exporter, Set.empty[String]) - Resource.eval(MonadThrow[F].fromEither(values)).flatMap { + Resource.eval(MonadCancelThrow[F].fromEither(values)).flatMap { case names if names.contains(Const.NoneExporter) && names.sizeIs > 1 => Resource.raiseError( ConfigurationError( @@ -162,7 +162,7 @@ private[sdk] object MetricExportersAutoConfigure { * @param configurers * the configurers to use */ - def apply[F[_]: MonadThrow: Console]( + def apply[F[_]: MonadCancelThrow: Console]( configurers: Set[AutoConfigure.Named[F, MetricExporter[F]]] ): AutoConfigure[F, Map[String, MetricExporter[F]]] = new MetricExportersAutoConfigure[F](configurers) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/ContextPropagatorsAutoConfigure.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/ContextPropagatorsAutoConfigure.scala index 86238592a..75a6a5534 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/ContextPropagatorsAutoConfigure.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/ContextPropagatorsAutoConfigure.scala @@ -16,8 +16,8 @@ package org.typelevel.otel4s.sdk.trace.autoconfigure -import cats.MonadThrow import cats.data.NonEmptyList +import cats.effect.MonadCancelThrow import cats.effect.Resource import org.typelevel.otel4s.context.propagation.ContextPropagators import org.typelevel.otel4s.context.propagation.TextMapPropagator @@ -43,7 +43,7 @@ import org.typelevel.otel4s.sdk.trace.context.propagation.W3CTraceContextPropaga * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#propagators]] */ -private final class ContextPropagatorsAutoConfigure[F[_]: MonadThrow]( +private final class ContextPropagatorsAutoConfigure[F[_]: MonadCancelThrow]( extra: Set[AutoConfigure.Named[F, TextMapPropagator[Context]]] ) extends AutoConfigure.WithHint[F, ContextPropagators[Context]]( "ContextPropagators", @@ -73,7 +73,7 @@ private final class ContextPropagatorsAutoConfigure[F[_]: MonadThrow]( def fromConfig(config: Config): Resource[F, ContextPropagators[Context]] = { val values = config.getOrElse(ConfigKeys.Propagators, Set.empty[String]) - Resource.eval(MonadThrow[F].fromEither(values)).flatMap { + Resource.eval(MonadCancelThrow[F].fromEither(values)).flatMap { case names if names.contains(Const.NonePropagator) && names.sizeIs > 1 => Resource.raiseError( ConfigurationError( @@ -149,7 +149,7 @@ private[sdk] object ContextPropagatorsAutoConfigure { * @param extra * extra configurers to use */ - def apply[F[_]: MonadThrow]( + def apply[F[_]: MonadCancelThrow]( extra: Set[AutoConfigure.Named[F, TextMapPropagator[Context]]] ): AutoConfigure[F, ContextPropagators[Context]] = new ContextPropagatorsAutoConfigure[F](extra) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SamplerAutoConfigure.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SamplerAutoConfigure.scala index 4ffbe822b..0da582cd9 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SamplerAutoConfigure.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SamplerAutoConfigure.scala @@ -16,7 +16,7 @@ package org.typelevel.otel4s.sdk.trace.autoconfigure -import cats.MonadThrow +import cats.effect.MonadCancelThrow import cats.effect.Resource import cats.syntax.either._ import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure @@ -37,7 +37,7 @@ import org.typelevel.otel4s.sdk.trace.samplers.Sampler * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#sampler]] */ -private final class SamplerAutoConfigure[F[_]: MonadThrow]( +private final class SamplerAutoConfigure[F[_]: MonadCancelThrow]( extra: Set[AutoConfigure.Named[F, Sampler[F]]] ) extends AutoConfigure.WithHint[F, Sampler[F]]("Sampler", SamplerAutoConfigure.ConfigKeys.All) { @@ -154,7 +154,7 @@ private[sdk] object SamplerAutoConfigure { * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#sampler]] */ - def apply[F[_]: MonadThrow](extra: Set[AutoConfigure.Named[F, Sampler[F]]]): AutoConfigure[F, Sampler[F]] = + def apply[F[_]: MonadCancelThrow](extra: Set[AutoConfigure.Named[F, Sampler[F]]]): AutoConfigure[F, Sampler[F]] = new SamplerAutoConfigure[F](extra) } diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanExportersAutoConfigure.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanExportersAutoConfigure.scala index 6f8d60267..5d83f019d 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanExportersAutoConfigure.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanExportersAutoConfigure.scala @@ -16,8 +16,8 @@ package org.typelevel.otel4s.sdk.trace.autoconfigure -import cats.MonadThrow import cats.data.NonEmptyList +import cats.effect.MonadCancelThrow import cats.effect.Resource import cats.effect.std.Console import cats.syntax.applicative._ @@ -41,7 +41,7 @@ import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter * @see * [[https://opentelemetry.io/docs/languages/java/configuration/#span-exporters]] */ -private final class SpanExportersAutoConfigure[F[_]: MonadThrow: Console]( +private final class SpanExportersAutoConfigure[F[_]: MonadCancelThrow: Console]( extra: Set[AutoConfigure.Named[F, SpanExporter[F]]] ) extends AutoConfigure.WithHint[F, Map[String, SpanExporter[F]]]( "SpanExporters", @@ -62,7 +62,7 @@ private final class SpanExportersAutoConfigure[F[_]: MonadThrow: Console]( def fromConfig(config: Config): Resource[F, Map[String, SpanExporter[F]]] = { val values = config.getOrElse(ConfigKeys.Exporter, Set.empty[String]) - Resource.eval(MonadThrow[F].fromEither(values)).flatMap { + Resource.eval(MonadCancelThrow[F].fromEither(values)).flatMap { case names if names.contains(Const.NoneExporter) && names.sizeIs > 1 => Resource.raiseError( ConfigurationError( @@ -157,7 +157,7 @@ private[sdk] object SpanExportersAutoConfigure { * @param configurers * the configurers to use */ - def apply[F[_]: MonadThrow: Console]( + def apply[F[_]: MonadCancelThrow: Console]( configurers: Set[AutoConfigure.Named[F, SpanExporter[F]]] ): AutoConfigure[F, Map[String, SpanExporter[F]]] = new SpanExportersAutoConfigure[F](configurers) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanLimitsAutoConfigure.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanLimitsAutoConfigure.scala index a73da2b6a..304e7c44f 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanLimitsAutoConfigure.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/SpanLimitsAutoConfigure.scala @@ -16,7 +16,7 @@ package org.typelevel.otel4s.sdk.trace.autoconfigure -import cats.MonadThrow +import cats.effect.MonadCancelThrow import cats.effect.Resource import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure import org.typelevel.otel4s.sdk.autoconfigure.Config @@ -39,7 +39,7 @@ import org.typelevel.otel4s.sdk.trace.SpanLimits * @see * [[https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#span-limits]] */ -private final class SpanLimitsAutoConfigure[F[_]: MonadThrow] +private final class SpanLimitsAutoConfigure[F[_]: MonadCancelThrow] extends AutoConfigure.WithHint[F, SpanLimits]( "SpanLimits", SpanLimitsAutoConfigure.ConfigKeys.All @@ -100,7 +100,7 @@ private final class SpanLimitsAutoConfigure[F[_]: MonadThrow] withMaxAttributeValueLength.build } - Resource.eval(MonadThrow[F].fromEither(configure)) + Resource.eval(MonadCancelThrow[F].fromEither(configure)) } } @@ -153,7 +153,7 @@ private[sdk] object SpanLimitsAutoConfigure { * @see * [[https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#span-limits]] */ - def apply[F[_]: MonadThrow]: AutoConfigure[F, SpanLimits] = + def apply[F[_]: MonadCancelThrow]: AutoConfigure[F, SpanLimits] = new SpanLimitsAutoConfigure[F] }