From fd4b7e712f43abe7a15fc6501a43163af6e2700a Mon Sep 17 00:00:00 2001 From: mamunto Date: Mon, 11 Nov 2024 15:52:26 -0500 Subject: [PATCH 1/5] added metrics for drop and export Span processor --- .../Metrics/Stable/DefaultStableMeter.swift | 20 ++- .../Stable/DefaultStableMeterProvider.swift | 2 + .../Metrics/Stable/LongCounterBuilder.swift | 2 + .../Metrics/Stable/LongGaugeBuilder.swift | 2 + .../Stable/LongCounterMeterBuilderSdk.swift | 10 ++ .../Metrics/Stable/LongGaugeBuilderSdk.swift | 10 +- .../SpanProcessors/BatchSpanProcessor.swift | 149 ++++++++++++------ .../Export/BatchSpansProcessorTests.swift | 51 +++++- 8 files changed, 190 insertions(+), 56 deletions(-) diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift index f13a20ae..ca56fda6 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift @@ -60,7 +60,15 @@ public class DefaultStableMeter : StableMeter { } } - private class NoopLongGaugeBuilder : LongGaugeBuilder { + private class NoopLongGaugeBuilder : LongGaugeBuilder { + func setUnit(_ unit: String) -> any LongGaugeBuilder { + self + } + + func setDescription(_ description: String) -> any LongGaugeBuilder { + self + } + func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge { NoopObservableLongGauge() } @@ -123,7 +131,15 @@ public class DefaultStableMeter : StableMeter { func add(value: Int, attribute: [String : AttributeValue]) {} } - private class NoopLongCounterBuilder : LongCounterBuilder { + private class NoopLongCounterBuilder : LongCounterBuilder { + func setUnit(_ unit: String) -> any LongCounterBuilder { + self + } + + func setDescription(_ description: String) -> any LongCounterBuilder { + self + } + func ofDoubles() -> DoubleCounterBuilder { NoopDoubleCounterBuilder() } diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift index 9cca201f..14634a3e 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift @@ -8,6 +8,8 @@ import Foundation public class DefaultStableMeterProvider: StableMeterProvider { static let noopMeterBuilder = NoopMeterBuilder() + public init() {} + public static func noop() -> MeterBuilder { noopMeterBuilder } diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift index f6181dd1..fcb7995b 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift @@ -6,6 +6,8 @@ import Foundation public protocol LongCounterBuilder : AnyObject { + func setUnit(_ unit: String) -> LongCounterBuilder + func setDescription(_ description: String) -> LongCounterBuilder func ofDoubles() -> DoubleCounterBuilder func build() -> LongCounter func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift index 42bf644c..b61cc7b1 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift @@ -6,5 +6,7 @@ import Foundation public protocol LongGaugeBuilder : AnyObject { + func setUnit(_ unit: String) -> LongGaugeBuilder + func setDescription(_ description: String) -> LongGaugeBuilder func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift index 92dde2d7..39a19cb9 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift @@ -41,4 +41,14 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder { -> OpenTelemetryApi.ObservableLongCounter { registerLongAsynchronousInstrument(type: .observableCounter, updater: callback) } + + public func setDescription(_ description: String) -> LongCounterBuilder { + self.description = description + return self + } + + public func setUnit(_ unit: String) -> LongCounterBuilder { + self.unit = unit + return self + } } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift index fa9c0e96..9d2087da 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift @@ -34,7 +34,13 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder { registerLongAsynchronousInstrument(type: type, updater: callback) } - - + public func setDescription(_ description: String) -> LongGaugeBuilder { + self.description = description + return self + } + public func setUnit(_ unit: String) -> LongGaugeBuilder { + self.unit = unit + return self + } } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index a393e3d9..b87824db 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,21 +15,36 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { + fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" + fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" + fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name + + fileprivate var worker: BatchWorker - - fileprivate var worker: BatchWorker - - public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil) - { - worker = BatchWorker(spanExporter: spanExporter, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback) - worker.start() - } + public static var name: String { + String(describing: Self.self) + } + + public init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval = 5, + exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, + maxExportBatchSize: Int = 512, + willExportCallback: ((inout [SpanData]) -> Void)? = nil + ) { + worker = BatchWorker( + spanExporter: spanExporter, + meterProvider: meterProvider, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback + ) + worker.start() + } public let isStartRequired = false public let isEndRequired = true @@ -57,36 +72,77 @@ public struct BatchSpanProcessor: SpanProcessor { /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue - - init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) { - self.spanExporter = spanExporter - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 - } + let spanExporter: SpanExporter + let meterProvider: StableMeterProvider + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + private var queueSizeGauge: ObservableLongGauge? + private var processedSpansCounter: LongCounter + private let droppedAttrs: [String: AttributeValue] + private let exportedAttrs: [String: AttributeValue] + init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider, + scheduleDelay: TimeInterval, + exportTimeout: TimeInterval, + maxQueueSize: Int, + maxExportBatchSize: Int, + willExportCallback: ((inout [SpanData]) -> Void)? + ) { + self.spanExporter = spanExporter + self.meterProvider = meterProvider + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 + + let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() + self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize") + .ofLongs() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + + processedSpansCounter = meter.counterBuilder(name: "processedSpans") + .setUnit("1") + .setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + .build() + droppedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ] + exportedAttrs = [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) + ] + } func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - // TODO: Record a counter for dropped spans. + processedSpansCounter.add(value: 1, attribute: droppedAttrs) return } // TODO: Record a gauge for referenced spans. @@ -148,11 +204,16 @@ private class BatchWorker: Thread { timeoutTimer.cancel() } - private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + if result == .success { + cond.lock() + processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs) + cond.unlock() + } + } } - } } diff --git a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift index 47507f0e..68d5355e 100644 --- a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift +++ b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift @@ -44,7 +44,10 @@ class BatchSpansProcessorTests: XCTestCase { } func testStartEndRequirements() { - let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0)) + let spansProcessor = BatchSpanProcessor( + spanExporter: WaitingSpanExporter(numberToWaitFor: 0), + meterProvider: DefaultStableMeterProvider() + ) XCTAssertFalse(spansProcessor.isStartRequired) XCTAssertTrue(spansProcessor.isEndRequired) } @@ -52,7 +55,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportDifferentSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) let exported = waitingSpanExporter.waitForExport() @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase { func testExportMoreSpansThanTheBufferSize() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay, + maxQueueSize: 6, maxExportBatchSize: 2) + ) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName1) @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase { func testForceExport() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000) + let batchSpansProcessor = BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: 10, + maxQueueSize: 10000, + maxExportBatchSize: 2000 + ) tracerSdkFactory.addSpanProcessor(batchSpansProcessor) for _ in 0 ..< 100 { @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase { func testExportSpansToMultipleServices() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2) let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay)) let span1 = createSampledEndedSpan(spanName: spanName1) let span2 = createSampledEndedSpan(spanName: spanName2) @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase { let maxQueuedSpans = 8 let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay, + maxQueueSize: maxQueuedSpans, + maxExportBatchSize: maxQueuedSpans / 2) + ) var spansToExport = [SpanData]() // Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase { func testExportNotSampledSpans() { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: maxScheduleDelay) + ) createNotSampledEndedSpan(spanName: spanName1) createNotSampledEndedSpan(spanName: spanName2) @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works - tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1)) + tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( + spanExporter: waitingSpanExporter, + meterProvider: DefaultStableMeterProvider(), + scheduleDelay: 0.1) + ) let span2 = createSampledEndedSpan(spanName: spanName2) From 6e92546a2ff3cd051309219eb99aaead9fe2b3ed Mon Sep 17 00:00:00 2001 From: mamunto Date: Wed, 20 Nov 2024 11:20:24 -0500 Subject: [PATCH 2/5] added span size gauge --- .../Metrics/Stable/DefaultStableMeter.swift | 30 ++---- .../Metrics/Stable/LongCounterBuilder.swift | 2 - .../Metrics/Stable/LongGaugeBuilder.swift | 2 - .../Stable/LongCounterMeterBuilderSdk.swift | 10 -- .../Metrics/Stable/LongGaugeBuilderSdk.swift | 10 -- .../SpanProcessors/BatchSpanProcessor.swift | 93 +++++++++++++++---- 6 files changed, 81 insertions(+), 66 deletions(-) diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift index ca56fda6..f13e37ff 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift @@ -61,14 +61,6 @@ public class DefaultStableMeter : StableMeter { } private class NoopLongGaugeBuilder : LongGaugeBuilder { - func setUnit(_ unit: String) -> any LongGaugeBuilder { - self - } - - func setDescription(_ description: String) -> any LongGaugeBuilder { - self - } - func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge { NoopObservableLongGauge() } @@ -132,25 +124,17 @@ public class DefaultStableMeter : StableMeter { } private class NoopLongCounterBuilder : LongCounterBuilder { - func setUnit(_ unit: String) -> any LongCounterBuilder { - self + func ofDoubles() -> DoubleCounterBuilder { + NoopDoubleCounterBuilder() } - func setDescription(_ description: String) -> any LongCounterBuilder { - self + func build() -> LongCounter { + NoopLongCounter() } - func ofDoubles() -> DoubleCounterBuilder { - NoopDoubleCounterBuilder() - } - - func build() -> LongCounter { - NoopLongCounter() - } - - func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter { - NoopObservableLongCounter() - } + func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter { + NoopObservableLongCounter() + } } private class NoopDoubleCounterBuilder : DoubleCounterBuilder { diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift index fcb7995b..f6181dd1 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongCounterBuilder.swift @@ -6,8 +6,6 @@ import Foundation public protocol LongCounterBuilder : AnyObject { - func setUnit(_ unit: String) -> LongCounterBuilder - func setDescription(_ description: String) -> LongCounterBuilder func ofDoubles() -> DoubleCounterBuilder func build() -> LongCounter func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift index b61cc7b1..42bf644c 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/LongGaugeBuilder.swift @@ -6,7 +6,5 @@ import Foundation public protocol LongGaugeBuilder : AnyObject { - func setUnit(_ unit: String) -> LongGaugeBuilder - func setDescription(_ description: String) -> LongGaugeBuilder func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift index 39a19cb9..92dde2d7 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongCounterMeterBuilderSdk.swift @@ -41,14 +41,4 @@ public class LongCounterMeterBuilderSdk: LongCounterBuilder, InstrumentBuilder { -> OpenTelemetryApi.ObservableLongCounter { registerLongAsynchronousInstrument(type: .observableCounter, updater: callback) } - - public func setDescription(_ description: String) -> LongCounterBuilder { - self.description = description - return self - } - - public func setUnit(_ unit: String) -> LongCounterBuilder { - self.unit = unit - return self - } } diff --git a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift index 9d2087da..54721426 100644 --- a/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift +++ b/Sources/OpenTelemetrySdk/Metrics/Stable/LongGaugeBuilderSdk.swift @@ -33,14 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder { public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge { registerLongAsynchronousInstrument(type: type, updater: callback) } - - public func setDescription(_ description: String) -> LongGaugeBuilder { - self.description = description - return self - } - - public func setUnit(_ unit: String) -> LongGaugeBuilder { - self.unit = unit - return self - } } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index b87824db..f4688858 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -85,9 +85,12 @@ private class BatchWorker: Thread { var queue: OperationQueue private var queueSizeGauge: ObservableLongGauge? - private var processedSpansCounter: LongCounter + private var spanGaugeObserver: ObservableLongGauge? + + private var processedSpansCounter: LongCounter? private let droppedAttrs: [String: AttributeValue] private let exportedAttrs: [String: AttributeValue] + private let spanGaugeBuilder: LongGaugeBuilder init( spanExporter: SpanExporter, meterProvider: StableMeterProvider, @@ -110,23 +113,33 @@ private class BatchWorker: Thread { queue.maxConcurrentOperationCount = 1 let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() - self.queueSizeGauge = meter.gaugeBuilder(name: "queueSize") + do { + self.queueSizeGauge = try meter.gaugeBuilder(name: "queueSize") + .ofLongs() + .asLongSdk() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + } catch {} + + self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize") .ofLongs() - .setDescription("The number of items queued") - .setUnit("1") - .buildWithCallback { result in - result.record( - value: maxQueueSize, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } - processedSpansCounter = meter.counterBuilder(name: "processedSpans") - .setUnit("1") - .setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") - .build() + do { + processedSpansCounter = try meter.counterBuilder(name: "processedSpans") + .asLongSdk() + .setUnit("1") + .setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + .build() + } catch {} + droppedAttrs = [ BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) @@ -137,16 +150,35 @@ private class BatchWorker: Thread { ] } + deinit { + // Cleanup all gauge observer + self.queueSizeGauge?.close() + self.spanGaugeObserver?.close() + } + func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - processedSpansCounter.add(value: 1, attribute: droppedAttrs) + processedSpansCounter?.add(value: 1, attribute: droppedAttrs) return } - // TODO: Record a gauge for referenced spans. spanList.append(span) + + // If there is any observer before, let's close it + self.spanGaugeObserver?.close() + // Subscribe to new gauge observer + self.spanGaugeObserver = self.spanGaugeBuilder + .buildWithCallback { [count = spanList.count] result in + result.record( + value: count, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize { @@ -211,9 +243,32 @@ private class BatchWorker: Thread { let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) if result == .success { cond.lock() - processedSpansCounter.add(value: spanList.count, attribute: exportedAttrs) + processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs) cond.unlock() } } } } + +public enum GuageError: Error { + case invalidType +} + +/// Helper function to handle +extension LongGaugeBuilder { + public func asLongSdk() throws -> LongGaugeBuilderSdk { + guard let sdkType = self as? LongGaugeBuilderSdk else { + throw GuageError.invalidType + } + return sdkType + } +} + +extension LongCounterBuilder { + public func asLongSdk() throws -> LongCounterMeterBuilderSdk { + guard let sdkType = self as? LongCounterMeterBuilderSdk else { + throw GuageError.invalidType + } + return sdkType + } +} From d968d8a8fc86413484da98706cd36b1a77c0783a Mon Sep 17 00:00:00 2001 From: mamunto Date: Fri, 22 Nov 2024 11:35:46 -0500 Subject: [PATCH 3/5] addressed PR comments --- .../Stable/DefaultStableMeterProvider.swift | 2 - .../SpanProcessors/BatchSpanProcessor.swift | 63 +++++-------------- .../Export/BatchSpansProcessorTests.swift | 16 ++--- 3 files changed, 25 insertions(+), 56 deletions(-) diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift index 14634a3e..9cca201f 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeterProvider.swift @@ -8,8 +8,6 @@ import Foundation public class DefaultStableMeterProvider: StableMeterProvider { static let noopMeterBuilder = NoopMeterBuilder() - public init() {} - public static func noop() -> MeterBuilder { noopMeterBuilder } diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index f4688858..9ad3ae1b 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -113,32 +113,26 @@ private class BatchWorker: Thread { queue.maxConcurrentOperationCount = 1 let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() - do { - self.queueSizeGauge = try meter.gaugeBuilder(name: "queueSize") - .ofLongs() - .asLongSdk() - .setDescription("The number of items queued") - .setUnit("1") - .buildWithCallback { result in - result.record( - value: maxQueueSize, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } - } catch {} - + + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk + longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") + longGaugeSdk = longGaugeSdk?.setUnit("1") + self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize") .ofLongs() - do { - processedSpansCounter = try meter.counterBuilder(name: "processedSpans") - .asLongSdk() - .setUnit("1") - .setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") - .build() - } catch {} + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk + longCounterSdk = longCounterSdk?.setUnit("1") + longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + processedSpansCounter = longCounterSdk?.build() droppedAttrs = [ BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), @@ -249,26 +243,3 @@ private class BatchWorker: Thread { } } } - -public enum GuageError: Error { - case invalidType -} - -/// Helper function to handle -extension LongGaugeBuilder { - public func asLongSdk() throws -> LongGaugeBuilderSdk { - guard let sdkType = self as? LongGaugeBuilderSdk else { - throw GuageError.invalidType - } - return sdkType - } -} - -extension LongCounterBuilder { - public func asLongSdk() throws -> LongCounterMeterBuilderSdk { - guard let sdkType = self as? LongCounterMeterBuilderSdk else { - throw GuageError.invalidType - } - return sdkType - } -} diff --git a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift index 68d5355e..26217290 100644 --- a/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift +++ b/Tests/OpenTelemetrySdkTests/Trace/Export/BatchSpansProcessorTests.swift @@ -46,7 +46,7 @@ class BatchSpansProcessorTests: XCTestCase { func testStartEndRequirements() { let spansProcessor = BatchSpanProcessor( spanExporter: WaitingSpanExporter(numberToWaitFor: 0), - meterProvider: DefaultStableMeterProvider() + meterProvider: DefaultStableMeterProvider.instance ) XCTAssertFalse(spansProcessor.isStartRequired) XCTAssertTrue(spansProcessor.isEndRequired) @@ -57,7 +57,7 @@ class BatchSpansProcessorTests: XCTestCase { tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: waitingSpanExporter, - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: maxScheduleDelay) ) let span1 = createSampledEndedSpan(spanName: spanName1) @@ -72,7 +72,7 @@ class BatchSpansProcessorTests: XCTestCase { tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: waitingSpanExporter, - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2) ) @@ -96,7 +96,7 @@ class BatchSpansProcessorTests: XCTestCase { let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1) let batchSpansProcessor = BatchSpanProcessor( spanExporter: waitingSpanExporter, - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000 @@ -116,7 +116,7 @@ class BatchSpansProcessorTests: XCTestCase { let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2) tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: maxScheduleDelay)) let span1 = createSampledEndedSpan(spanName: spanName1) @@ -133,7 +133,7 @@ class BatchSpansProcessorTests: XCTestCase { tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2) @@ -191,7 +191,7 @@ class BatchSpansProcessorTests: XCTestCase { tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: waitingSpanExporter, - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: maxScheduleDelay) ) @@ -214,7 +214,7 @@ class BatchSpansProcessorTests: XCTestCase { // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works tracerSdkFactory.addSpanProcessor(BatchSpanProcessor( spanExporter: waitingSpanExporter, - meterProvider: DefaultStableMeterProvider(), + meterProvider: DefaultStableMeterProvider.instance, scheduleDelay: 0.1) ) From 93f26a49c0ea39ce7f4e2427ee1ab88619f7459e Mon Sep 17 00:00:00 2001 From: mamunto Date: Fri, 22 Nov 2024 13:18:41 -0500 Subject: [PATCH 4/5] Tried to fix the indent --- .../Metrics/Stable/DefaultStableMeter.swift | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift index f13e37ff..52a1d413 100644 --- a/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift +++ b/Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift @@ -123,18 +123,18 @@ public class DefaultStableMeter : StableMeter { func add(value: Int, attribute: [String : AttributeValue]) {} } - private class NoopLongCounterBuilder : LongCounterBuilder { - func ofDoubles() -> DoubleCounterBuilder { - NoopDoubleCounterBuilder() - } + private class NoopLongCounterBuilder : LongCounterBuilder { + func ofDoubles() -> DoubleCounterBuilder { + NoopDoubleCounterBuilder() + } - func build() -> LongCounter { - NoopLongCounter() - } + func build() -> LongCounter { + NoopLongCounter() + } - func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter { - NoopObservableLongCounter() - } + func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter { + NoopObservableLongCounter() + } } private class NoopDoubleCounterBuilder : DoubleCounterBuilder { From 8e0ad291f0caa6ca50561fe5ebf15db223164a16 Mon Sep 17 00:00:00 2001 From: mamunto Date: Fri, 22 Nov 2024 16:05:40 -0500 Subject: [PATCH 5/5] addressed PR comment --- .../SpanProcessors/BatchSpanProcessor.swift | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index 9ad3ae1b..c9906fb1 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -142,6 +142,17 @@ private class BatchWorker: Thread { BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) ] + + // Subscribe to new gauge observer + self.spanGaugeObserver = self.spanGaugeBuilder + .buildWithCallback { [count = spanList.count] result in + result.record( + value: count, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } } deinit { @@ -160,19 +171,6 @@ private class BatchWorker: Thread { } spanList.append(span) - // If there is any observer before, let's close it - self.spanGaugeObserver?.close() - // Subscribe to new gauge observer - self.spanGaugeObserver = self.spanGaugeBuilder - .buildWithCallback { [count = spanList.count] result in - result.record( - value: count, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } - // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize {