Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC authenticator for exporter #6952

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat
field.setAccessible(true);
Object value = field.get(builder);
if (value instanceof GrpcExporterBuilder) {
throw new IllegalArgumentException("GrpcExporterBuilder not supported yet.");
((GrpcExporterBuilder<?>) value).setAuthenticator(authenticator);
} else if (value instanceof HttpExporterBuilder) {
((HttpExporterBuilder<?>) value).setAuthenticator(authenticator);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
private TlsConfigHelper tlsConfigHelper = new TlsConfigHelper();
@Nullable private RetryPolicy retryPolicy = RetryPolicy.getDefault();
private Supplier<MeterProvider> meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider;
@Nullable private Authenticator authenticator;

// Use Object type since gRPC may not be on the classpath.
@Nullable private Object grpcChannel;
Expand Down Expand Up @@ -147,6 +149,11 @@ public GrpcExporterBuilder<T> setMeterProvider(Supplier<MeterProvider> meterProv
return this;
}

public GrpcExporterBuilder<T> setAuthenticator(Authenticator authenticator) {
this.authenticator = authenticator;
return this;
}

@SuppressWarnings("BuilderReturnThis")
public GrpcExporterBuilder<T> copy() {
GrpcExporterBuilder<T> copy =
Expand Down Expand Up @@ -209,7 +216,8 @@ public GrpcExporter<T> build() {
grpcStubFactory,
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager());
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
authenticator);
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.grpc;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
Expand Down Expand Up @@ -40,5 +41,6 @@ <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager);
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@
{
"name":"io.opentelemetry.exporter.internal.compression.Compressor",
"queryAllDeclaredMethods":true
},
{
"name":"io.opentelemetry.exporter.internal.auth.Authenticator",
"queryAllDeclaredMethods":true
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,16 +33,22 @@ void getHeaders() {

@Test
void setAuthenticatorOnDelegate_Success() {
HttpExporterBuilder<?> builder =
// For HTTP exporter
HttpExporterBuilder<?> httpBuilder =
new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test");

assertThat(builder).extracting("authenticator").isNull();

assertThat(httpBuilder).extracting("authenticator").isNull();
Authenticator authenticator = Collections::emptyMap;
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(httpBuilder), authenticator);
assertThat(httpBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);

Authenticator.setAuthenticatorOnDelegate(new WithDelegate(builder), authenticator);

assertThat(builder)
// For GRPC exporter
GrpcExporterBuilder<?> grpcBuilder =
new GrpcExporterBuilder<>("otlp", "test", 60, URI.create("test"), null, "/test");
assertThat(grpcBuilder).extracting("authenticator").isNull();
Authenticator.setAuthenticatorOnDelegate(new WithDelegate(grpcBuilder), authenticator);
assertThat(grpcBuilder)
.extracting("authenticator", as(InstanceOfAssertFactories.type(Authenticator.class)))
.isSameAs(authenticator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void setUp() {
Collections::emptyMap,
null,
null,
null,
null),
MeterProvider::noop);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

package io.opentelemetry.exporter.sender.grpc.managedchannel.internal;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.CompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand All @@ -21,6 +26,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -47,7 +53,8 @@
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
Expand Down Expand Up @@ -83,11 +90,29 @@
compression = compressor.getEncoding();
}

CallCredentials cred =
new CallCredentials() {
@Override
public void applyRequestMetadata(
RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) {
Metadata headers = new Metadata();
if (authenticator != null) {
// For each header provided in the authenticator, put it in the header of the
// Metadata.
for (Map.Entry<String, String> e : authenticator.getHeaders().entrySet()) {
headers.put(Metadata.Key.of(e.getKey(), ASCII_STRING_MARSHALLER), e.getValue());
}

Check warning on line 104 in exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java#L103-L104

Added lines #L103 - L104 were not covered by tests
}
metadataApplier.apply(headers);
}
};

MarshalerServiceStub<T, ?, ?> stub =
stubFactory
.get()
.apply((Channel) managedChannel, authorityOverride)
.withCompression(compression);
.withCompression(compression)
.withCallCredentials(cred);

return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.opentelemetry.api.internal.InstrumentationUtil;
import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
Expand Down Expand Up @@ -80,7 +82,8 @@
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
.dispatcher(OkHttpUtil.newDispatcher())
Expand All @@ -90,7 +93,6 @@
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
}

boolean isPlainHttp = endpoint.startsWith("http://");
if (isPlainHttp) {
clientBuilder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
Expand All @@ -102,6 +104,20 @@
}
}

if (authenticator != null) {
Map<String, List<String>> headers = headersSupplier.get();

Check warning on line 108 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L108

Added line #L108 was not covered by tests
// Convert the auth header type of Map<String, String> to the expected type of
// OkHttpGrpcSender of Map<String, List<String>>
Map<String, List<String>> authHeaders =
authenticator.getHeaders().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> Collections.singletonList(e.getValue())));

Check warning on line 115 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L111-L115

Added lines #L111 - L115 were not covered by tests
// The authenticator headers will override the default headers if there are duplicate keys.
headers.putAll(authHeaders);
headersSupplier = () -> headers;

Check warning on line 118 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L117-L118

Added lines #L117 - L118 were not covered by tests
}

this.client = clientBuilder.build();
this.headersSupplier = headersSupplier;
this.url = HttpUrl.get(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.sender.okhttp.internal;

import io.grpc.Channel;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
Expand Down Expand Up @@ -41,7 +42,8 @@ public <T extends Marshaler> GrpcSender<T> createSender(
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
@Nullable X509TrustManager trustManager,
@Nullable Authenticator authenticator) {
return new OkHttpGrpcSender<>(
endpoint.resolve(endpointPath).toString(),
compressor,
Expand All @@ -50,6 +52,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
headersSupplier,
retryPolicy,
sslContext,
trustManager);
trustManager,
authenticator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable
@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {
Expand Down
Loading