Skip to content

Commit

Permalink
Merge pull request #791 from iRevive/benchmarks/bsp
Browse files Browse the repository at this point in the history
benchmarks: add `BatchSpanProcessor` benchmark
  • Loading branch information
iRevive authored Oct 5, 2024
2 parents 8cc7a38 + b9cb779 commit 0ac353d
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright 2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.benchmarks

import cats.Foldable
import cats.effect.IO
import cats.effect.Resource
import cats.effect.std.Random
import cats.effect.unsafe.implicits.global
import cats.syntax.foldable._
import org.openjdk.jmh.annotations._

import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.BatchSpanExporterBenchmark -prof gc
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Threads(5)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
class BatchSpanProcessorBenchmark {

import BatchSpanProcessorBenchmark._

@Param(Array("oteljava", "sdk"))
var backend: String = _

@Param(Array("0", "1", "5"))
var delayMs: Int = _

@Param(Array("1000", "2000", "5000"))
var spanCount: Int = _

private var processor: Processor = _
private var finalizer: IO[Unit] = _

@Benchmark
def doExport(): Unit =
processor.doExport()

@Setup(Level.Trial)
def setup(): Unit =
backend match {
case "oteljava" =>
val (proc, release) = Processor.otelJava(delayMs.millis, spanCount).allocated.unsafeRunSync()

processor = proc
finalizer = release

case "sdk" =>
val (proc, release) = Processor.sdk(delayMs.millis, spanCount).allocated.unsafeRunSync()

processor = proc
finalizer = release

case other =>
sys.error(s"unknown backend [$other]")
}

@TearDown(Level.Trial)
def cleanup(): Unit =
finalizer.unsafeRunSync()
}

object BatchSpanProcessorBenchmark {

trait Processor {
def doExport(): Unit
}

object Processor {

def otelJava(delay: FiniteDuration, spanCount: Int): Resource[IO, Processor] = {
import io.opentelemetry.api.trace.Span
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.trace.{ReadableSpan, SdkTracerProvider}
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.sdk.trace.`export`.{BatchSpanProcessor, SpanExporter}
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

def toIO(codeIO: IO[CompletableResultCode]): IO[Unit] =
codeIO.flatMap { code =>
IO.async[Unit] { cb =>
IO.delay {
code.whenComplete { () =>
cb(Either.cond(code.isSuccess, (), new RuntimeException("OpenTelemetry SDK async operation failed")))
}
None
}
}
}

def exporter(executor: ScheduledExecutorService): SpanExporter = new SpanExporter {
def `export`(spans: java.util.Collection[SpanData]): CompletableResultCode = {
val result = new CompletableResultCode()
executor.schedule(() => result.succeed(), delay.toMillis, TimeUnit.MILLISECONDS)
result
}

def flush(): CompletableResultCode =
CompletableResultCode.ofSuccess()

def shutdown(): CompletableResultCode =
CompletableResultCode.ofSuccess()
}

val tracer = SdkTracerProvider.builder().build().get("benchmarkTracer")

val spans: Vector[Span] =
Vector.fill(spanCount)(tracer.spanBuilder("span").startSpan())

def makeBsp(executor: ScheduledExecutorService) =
BatchSpanProcessor
.builder(exporter(executor))
.setMaxQueueSize(spanCount * 2)
.build

for {
executor <- Resource.make(IO.delay(Executors.newScheduledThreadPool(5)))(e => IO.delay(e.shutdown()))
bsp <- Resource.make(IO.delay(makeBsp(executor)))(r => toIO(IO.delay(r.shutdown())))
} yield new Processor {
def doExport(): Unit = {
spans.foreach(span => bsp.onEnd(span.asInstanceOf[ReadableSpan]))
val _ = bsp.forceFlush().join(10, TimeUnit.MINUTES)
()
}
}
}

def sdk(delay: FiniteDuration, spanCount: Int): Resource[IO, Processor] = {
import org.typelevel.otel4s.trace.{TraceFlags, TraceState}
import org.typelevel.otel4s.trace.{SpanContext, SpanKind}
import org.typelevel.otel4s.sdk.TelemetryResource
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.trace.IdGenerator
import org.typelevel.otel4s.sdk.trace.data.{LimitedData, SpanData, StatusData}
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor

val exporter: SpanExporter[IO] = new SpanExporter[IO] {
def name: String = s"DelayExporter($delay)"
def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] = IO.sleep(delay)
def flush: IO[Unit] = IO.unit
}

def mkSpanData(idGenerator: IdGenerator[IO], random: Random[IO]): IO[SpanData] =
for {
name <- random.nextString(20)
traceId <- idGenerator.generateTraceId
spanId <- idGenerator.generateSpanId
} yield SpanData(
name = name,
spanContext = SpanContext(traceId, spanId, TraceFlags.Sampled, TraceState.empty, remote = false),
parentSpanContext = None,
kind = SpanKind.Internal,
startTimestamp = Duration.Zero,
endTimestamp = None,
status = StatusData.Ok,
attributes = LimitedData.attributes(Int.MaxValue, 1024),
events = LimitedData.events(Int.MaxValue),
links = LimitedData.links(Int.MaxValue),
instrumentationScope = InstrumentationScope.empty,
resource = TelemetryResource.empty
)

for {
bsp <- BatchSpanProcessor.builder[IO](exporter).withMaxQueueSize(spanCount * 2).build
spans <- Resource.eval(
Random.scalaUtilRandom[IO].flatMap { implicit random =>
val generator = IdGenerator.random[IO]
mkSpanData(generator, random).replicateA(spanCount)
}
)
} yield new Processor {
def doExport(): Unit =
(spans.traverse_(span => bsp.onEnd(span)) >> bsp.forceFlush).unsafeRunSync()
}
}

}

}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ lazy val benchmarks = project
.enablePlugins(NoPublishPlugin)
.enablePlugins(JmhPlugin)
.in(file("benchmarks"))
.dependsOn(core.jvm, sdk.jvm, oteljava)
.dependsOn(core.jvm, sdk.jvm, `sdk-testkit`.jvm, oteljava)
.settings(
name := "otel4s-benchmarks",
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private (
) extends SpanProcessor[F] {
import BatchSpanProcessor.State

private val unit = Temporal[F].unit

val name: String = "BatchSpanProcessor{" +
s"exporter=${exporter.name}, " +
s"scheduleDelay=${config.scheduleDelay}, " +
Expand All @@ -69,29 +71,27 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private (
val isEndRequired: Boolean = true

def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] =
Temporal[F].unit

def onEnd(span: SpanData): F[Unit] = {
val canExport = span.spanContext.isSampled

// if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans
// and it waits for the 'signal'-latch to be released
// hence, if the queue size is >= than the number of needed spans, the latch can be released
def notifyWorker: F[Unit] =
(queue.size, state.get)
.mapN { (queueSize, state) =>
state.spansNeeded.exists(needed => queueSize >= needed)
}
.ifM(signal.release, Temporal[F].unit)
unit

def onEnd(span: SpanData): F[Unit] =
if (span.spanContext.isSampled) {
// if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans
// and it waits for the 'signal'-latch to be released
// hence, if the queue size is >= than the number of needed spans, the latch can be released
def notifyWorker: F[Unit] =
for {
queueSize <- queue.size
state <- state.get
_ <- if (state.spansNeeded.exists(needed => queueSize >= needed)) signal.release else unit
} yield ()

def enqueue =
for {
offered <- queue.tryOffer(span)
_ <- notifyWorker.whenA(offered)
_ <- if (offered) notifyWorker else unit
} yield ()

enqueue.whenA(canExport)
}
} else {
unit
}

def forceFlush: F[Unit] =
exportAll
Expand All @@ -117,12 +117,14 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private (
val request =
for {
_ <- state.update(_.copy(spansNeeded = Some(spansNeeded)))
_ <- signal.await.timeoutTo(pollWaitTime, Temporal[F].unit)
_ <- signal.await.timeoutTo(pollWaitTime, unit)
} yield ()

poll(request)
.guarantee(state.update(_.copy(spansNeeded = None)))
.whenA(pollWaitTime > Duration.Zero)
if (pollWaitTime > Duration.Zero) {
poll(request).guarantee(state.update(_.copy(spansNeeded = None)))
} else {
unit
}
}

for {
Expand Down

0 comments on commit 0ac353d

Please sign in to comment.