Skip to content

Commit

Permalink
Adding gfe_latencies metric to built-in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Nov 20, 2024
1 parent ea1ebad commit 9de6bd0
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public class BuiltInMetricsConstant {

public static final String GAX_METER_NAME = OpenTelemetryMetricsRecorder.GAX_METER_NAME;

public static final String SPANNER_METER_NAME = "spanner-java";

static final String OPERATION_LATENCIES_NAME = "operation_latencies";

static final String GFE_LATENCIES_NAME = "gfe_latencies";
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
static final String OPERATION_LATENCY_NAME = "operation_latency";
static final String ATTEMPT_LATENCY_NAME = "attempt_latency";
Expand Down Expand Up @@ -114,27 +118,39 @@ static Map<InstrumentSelector, View> getAllViews() {
ImmutableMap.Builder<InstrumentSelector, View> views = ImmutableMap.builder();
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.OPERATION_LATENCY_NAME,
BuiltInMetricsConstant.OPERATION_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.ATTEMPT_LATENCY_NAME,
BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.SPANNER_METER_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.OPERATION_COUNT_NAME,
BuiltInMetricsConstant.OPERATION_COUNT_NAME,
Aggregation.sum(),
InstrumentType.COUNTER,
"1");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.ATTEMPT_COUNT_NAME,
BuiltInMetricsConstant.ATTEMPT_COUNT_NAME,
Aggregation.sum(),
Expand All @@ -145,6 +161,7 @@ static Map<InstrumentSelector, View> getAllViews() {

private static void defineView(
ImmutableMap.Builder<InstrumentSelector, View> viewMap,
String meterName,
String metricName,
String metricViewName,
Aggregation aggregation,
Expand All @@ -153,7 +170,7 @@ private static void defineView(
InstrumentSelector selector =
InstrumentSelector.builder()
.setName(BuiltInMetricsConstant.METER_NAME + '/' + metricName)
.setMeterName(BuiltInMetricsConstant.GAX_METER_NAME)
.setMeterName(meterName)
.setType(type)
.setUnit(unit)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.core.GaxProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.util.Map;

/**
* OpenTelemetry implementation of recording metrics. This implementation collections the
* measurements related to the lifecyle of an RPC.
*
* <p>For the Otel implementation, an attempt is a single RPC invocation and an operation is the
* collection of all the attempts made before a response is returned (either as a success or an
* error). A single call (i.e. `EchoClient.echo()`) should have an operation_count of 1 and may have
* an attempt_count of 1+ (depending on the retry configurations).
*/
public class BuiltInOpenTelemetryMetricsRecorder {

private final DoubleHistogram gfeLatencyRecorder;

/**
* Creates the following instruments for the following metrics:
*
* <ul>
* <li>GFE Latency: Histogram
* </ul>
*
* @param openTelemetry OpenTelemetry instance
*/
public BuiltInOpenTelemetryMetricsRecorder(OpenTelemetry openTelemetry) {
Meter meter =
openTelemetry
.meterBuilder(BuiltInMetricsConstant.SPANNER_METER_NAME)
.setInstrumentationVersion(GaxProperties.getLibraryVersion(getClass()))
.build();
this.gfeLatencyRecorder =
meter
.histogramBuilder(
BuiltInMetricsConstant.METER_NAME + '/' + BuiltInMetricsConstant.GFE_LATENCIES_NAME)
.setDescription(
"Latency between Google's network receiving an RPC and reading back the first byte of the response")
.setUnit("ms")
.build();
}

/**
* Record the latency between Google's network receiving an RPC and reading back the first byte of
* the response. Data is stored in a Histogram.
*
* @param gfeLatency Attempt Latency in ms
* @param attributes Map of the attributes to store
*/
public void recordGFELatency(double gfeLatency, Map<String, String> attributes) {
gfeLatencyRecorder.record(gfeLatency, toOtelAttributes(attributes));
}

@VisibleForTesting
Attributes toOtelAttributes(Map<String, String> attributes) {
Preconditions.checkNotNull(attributes, "Attributes map cannot be null");
AttributesBuilder attributesBuilder = Attributes.builder();
attributes.forEach(attributesBuilder::put);
return attributesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,15 @@ public OpenTelemetry getOpenTelemetry() {
}
}

/**
* Returns an instance of OpenTelemetry. If OpenTelemetry object is not set via SpannerOptions
* then GlobalOpenTelemetry will be used as fallback.
*/
public OpenTelemetry getBuiltInMetricsOpenTelemetry() {
return this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
this.getProjectId(), getCredentials());
}

@Override
public ApiTracerFactory getApiTracerFactory() {
return createApiTracerFactory(false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
options.getBuiltInMetricsOpenTelemetry(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.api.gax.tracing.ApiTracer;
import com.google.cloud.spanner.BuiltInMetricsConstant;
import com.google.cloud.spanner.BuiltInOpenTelemetryMetricsRecorder;
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
Expand Down Expand Up @@ -94,12 +95,17 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Level LEVEL = Level.INFO;
private final SpannerRpcMetrics spannerRpcMetrics;

private final BuiltInOpenTelemetryMetricsRecorder builtInOpenTelemetryMetricsRecorder;

private final Supplier<Boolean> directPathEnabledSupplier;

HeaderInterceptor(
SpannerRpcMetrics spannerRpcMetrics, Supplier<Boolean> directPathEnabledSupplier) {
SpannerRpcMetrics spannerRpcMetrics,
BuiltInOpenTelemetryMetricsRecorder builtInOpenTelemetryMetricsRecorder,
Supplier<Boolean> directPathEnabledSupplier) {
this.spannerRpcMetrics = spannerRpcMetrics;
this.directPathEnabledSupplier = directPathEnabledSupplier;
this.builtInOpenTelemetryMetricsRecorder = builtInOpenTelemetryMetricsRecorder;
}

@Override
Expand All @@ -118,17 +124,22 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
Map<String, String> commonBuiltInMetricAttributes =
getCommonBuiltInMetricAttributes(key, databaseName);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata metadata) {
Boolean isDirectPathUsed =
isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
addBuiltInMetricAttributes(
compositeTracer, builtInMetricsAttributes, isDirectPathUsed);
processHeader(metadata, tagContext, attributes, span);
compositeTracer, commonBuiltInMetricAttributes, isDirectPathUsed);
processHeader(
metadata,
tagContext,
attributes,
span,
getBuiltInMetricAttributes(commonBuiltInMetricAttributes, isDirectPathUsed));
super.onHeaders(metadata);
}
},
Expand All @@ -142,7 +153,11 @@ public void onHeaders(Metadata metadata) {
}

private void processHeader(
Metadata metadata, TagContext tagContext, Attributes attributes, Span span) {
Metadata metadata,
TagContext tagContext,
Attributes attributes,
Span span,
Map<String, String> builtInMetricsAttributes) {
MeasureMap measureMap = STATS_RECORDER.newMeasureMap();
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
if (serverTiming != null && serverTiming.startsWith(SERVER_TIMING_HEADER_PREFIX)) {
Expand All @@ -154,6 +169,7 @@ private void processHeader(

spannerRpcMetrics.recordGfeLatency(latency, attributes);
spannerRpcMetrics.recordGfeHeaderMissingCount(0L, attributes);
builtInOpenTelemetryMetricsRecorder.recordGFELatency(latency, builtInMetricsAttributes);

if (span != null) {
span.setAttribute("gfe_latency", String.valueOf(latency));
Expand Down Expand Up @@ -224,8 +240,8 @@ private Attributes getMetricAttributes(String key, String method, DatabaseName d
});
}

private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName databaseName)
throws ExecutionException {
private Map<String, String> getCommonBuiltInMetricAttributes(
String key, DatabaseName databaseName) throws ExecutionException {
return builtInAttributesCache.get(
key,
() -> {
Expand All @@ -240,17 +256,21 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
});
}

private Map<String, String> getBuiltInMetricAttributes(
Map<String, String> commonBuiltInMetricsAttributes, Boolean isDirectPathUsed) {
Map<String, String> builtInMetricAttributes = new HashMap<>(commonBuiltInMetricsAttributes);
builtInMetricAttributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
return builtInMetricAttributes;
}

private void addBuiltInMetricAttributes(
CompositeTracer compositeTracer,
Map<String, String> builtInMetricsAttributes,
Map<String, String> commonBuiltInMetricsAttributes,
Boolean isDirectPathUsed) {
if (compositeTracer != null) {
// Direct Path used attribute
Map<String, String> attributes = new HashMap<>(builtInMetricsAttributes);
attributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));

compositeTracer.addAttributes(attributes);
compositeTracer.addAttributes(
getBuiltInMetricAttributes(commonBuiltInMetricsAttributes, isDirectPathUsed));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.cloud.spanner.BuiltInOpenTelemetryMetricsRecorder;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
Expand All @@ -44,26 +45,33 @@ private SpannerInterceptorProvider(List<ClientInterceptor> clientInterceptors) {

@ObsoleteApi("This method always uses Global OpenTelemetry")
public static SpannerInterceptorProvider createDefault() {
return createDefault(GlobalOpenTelemetry.get());
return createDefault(GlobalOpenTelemetry.get(), GlobalOpenTelemetry.get());
}

public static SpannerInterceptorProvider createDefault(OpenTelemetry openTelemetry) {
public static SpannerInterceptorProvider createDefault(
OpenTelemetry openTelemetry, OpenTelemetry builtInMetricsopenTelemetry) {
return createDefault(
openTelemetry,
builtInMetricsopenTelemetry,
Suppliers.memoize(
() -> {
return false;
}));
}

public static SpannerInterceptorProvider createDefault(
OpenTelemetry openTelemetry, Supplier<Boolean> directPathEnabledSupplier) {
OpenTelemetry openTelemetry,
OpenTelemetry builtInMetricsopenTelemetry,
Supplier<Boolean> directPathEnabledSupplier) {
List<ClientInterceptor> defaultInterceptorList = new ArrayList<>();
defaultInterceptorList.add(new SpannerErrorInterceptor());
defaultInterceptorList.add(
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));
defaultInterceptorList.add(
new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry), directPathEnabledSupplier));
new HeaderInterceptor(
new SpannerRpcMetrics(openTelemetry),
new BuiltInOpenTelemetryMetricsRecorder(builtInMetricsopenTelemetry),
directPathEnabledSupplier));
return new SpannerInterceptorProvider(ImmutableList.copyOf(defaultInterceptorList));
}

Expand Down

0 comments on commit 9de6bd0

Please sign in to comment.