Skip to content

Commit

Permalink
fix: Fix Broken OTEL Sink (#2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
hkfgo authored Nov 12, 2024
1 parent 1f21f61 commit 71d6827
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using GuardNet;
using Microsoft.Extensions.Logging;
Expand All @@ -17,11 +17,11 @@ namespace Promitor.Integrations.Sinks.OpenTelemetry
public class OpenTelemetryCollectorMetricSink : MetricSink, IMetricSink
{
private readonly ILogger<OpenTelemetryCollectorMetricSink> _logger;
private static readonly Meter azureMonitorMeter = new("Promitor.Scraper.Metrics.AzureMonitor", "1.0");
private static readonly Meter azureMonitorMeter = new Meter("Promitor.Scraper.Metrics.AzureMonitor", "1.0");

public MetricSinkType Type => MetricSinkType.OpenTelemetryCollector;

public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger<OpenTelemetryCollectorMetricSink> logger)
public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger<OpenTelemetryCollectorMetricSink> logger)
: base(metricsDeclarationProvider, logger)
{
Guard.NotNull(logger, nameof(logger));
Expand All @@ -40,66 +40,53 @@ public async Task ReportMetricAsync(string metricName, string metricDescription,
foreach (var measuredMetric in scrapeResult.MetricValues)
{
var metricValue = measuredMetric.Value ?? 0;

var metricLabels = DetermineLabels(metricName, scrapeResult, measuredMetric);

var reportMetricTask = ReportMetricAsync(metricName, metricDescription, metricValue, metricLabels);

reportMetricTasks.Add(reportMetricTask);
}

await Task.WhenAll(reportMetricTasks);
}

private readonly ConcurrentDictionary<string, ObservableGauge<double>> _gauges = new();
private readonly ConcurrentDictionary<string, Channel<Measurement<double>>> _measurements = new();
private readonly ConcurrentDictionary<string, ObservableGauge<double>> _gauges = new ConcurrentDictionary<string, ObservableGauge<double>>();
private readonly ConcurrentDictionary<string, HashSet<Measurement<double>>> _measurements = new ConcurrentDictionary<string, HashSet<Measurement<double>>>();

public async Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary<string, string> labels)
public Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary<string, string> labels)
{
Guard.NotNullOrEmpty(metricName, nameof(metricName));

// TODO: Move to factory instead?
// TODO: Move to factory instead?
if (_gauges.ContainsKey(metricName) == false)
{
InitializeNewMetric(metricName, metricDescription);
}

var composedTags = labels.Select(kvp => new KeyValuePair<string, object?>(kvp.Key, kvp.Value)).ToArray();
var newMeasurement = new Measurement<double>(metricValue, composedTags);
var channelWriter = _measurements[metricName].Writer;
await channelWriter.WriteAsync(newMeasurement);

_logger.LogTrace("Metric {MetricName} with value {MetricValue} was pushed to OpenTelemetry Collector", metricName, metricValue);
_measurements[metricName].Add(newMeasurement);

_logger.LogTrace("Metric {MetricName} with value {MetricValue} and labels {Labels} was pushed to OpenTelemetry Collector", metricName, metricValue, composedTags);

return Task.CompletedTask;
}

private void InitializeNewMetric(string metricName, string metricDescription)
{
var gauge = azureMonitorMeter.CreateObservableGauge(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetricAsync(metricName).Result);
var gauge = azureMonitorMeter.CreateObservableGauge<double>(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetric(metricName));
_gauges.TryAdd(metricName, gauge);

_measurements.TryAdd(metricName, CreateNewMeasurementChannel());
_measurements.TryAdd(metricName, []);
}

private async Task<IEnumerable<Measurement<double>>> ReportMeasurementsForMetricAsync(string metricName)
private IEnumerable<Measurement<double>> ReportMeasurementsForMetric(string metricName)
{
List<Measurement<double>> measurementsToReport = new List<Measurement<double>>();
var channel = _measurements[metricName];

var totalCount = channel.Reader.Count;
var readItems = 0;
do
{
var item = await channel.Reader.ReadAsync();
measurementsToReport.Add(item);
readItems++;
}
while (readItems < totalCount);
var recordedMeasurements = _measurements[metricName];

return measurementsToReport;
}
var measurementsToReport = Interlocked.Exchange(ref recordedMeasurements, []);

static Channel<Measurement<double>> CreateNewMeasurementChannel()
{
return Channel.CreateUnbounded<Measurement<double>>();
return measurementsToReport;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal PrometheusClient CreateForOpenTelemetryCollector(IConfiguration configu
var baseUri = configuration["OpenTelemetry:Collector:Uri"];
var metricNamespace = configuration["OpenTelemetry:Collector:MetricNamespace"];

_logger.LogInformation("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace);
_logger.LogWarning("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace);

return new PrometheusClient(baseUri, "/metrics", metricNamespace, _logger);
}
Expand Down

0 comments on commit 71d6827

Please sign in to comment.