diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts index 159d9c58745f..e73abc7bc35f 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/build.gradle.kts @@ -5,6 +5,8 @@ plugins { base.archivesName.set("${base.archivesName.get()}-autoconfigure") dependencies { + compileOnly(project(":javaagent-extension-api")) + implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library")) library("software.amazon.awssdk:aws-core:2.2.0") @@ -29,5 +31,6 @@ tasks { systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true) systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true) systemProperty("otel.instrumentation.aws-sdk.experimental-record-individual-http-error", true) + systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header") } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java index d35676cdd180..b4870cf3edda 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/AwsSdkSingletons.java @@ -5,35 +5,75 @@ package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil; import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; +import java.util.List; public final class AwsSdkSingletons { - private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = - ConfigPropertiesUtil.getBoolean( - "otel.instrumentation.aws-sdk.experimental-span-attributes", false); + private static final boolean HAS_INSTRUMENTATION_CONFIG = hasAgentConfiguration(); + private static final AwsSdkTelemetry TELEMETRY = + AwsSdkTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(getCapturedHeaders()) + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes()) + .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled()) + .setUseConfiguredPropagatorForMessaging(useMessagingPropagator()) + .setRecordIndividualHttpError(recordIndividualHttpError()) + .build(); - private static final boolean USE_MESSAGING_PROPAGATOR = - ConfigPropertiesUtil.getBoolean( - "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false); + private static boolean hasAgentConfiguration() { + try { + Class.forName("io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } - private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR = - ConfigPropertiesUtil.getBoolean( - "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false); + private static List getCapturedHeaders() { + if (HAS_INSTRUMENTATION_CONFIG) { + return ExperimentalConfig.get().getMessagingHeaders(); + } else { + return ConfigPropertiesUtil.getList( + "otel.instrumentation.messaging.experimental.capture-headers", emptyList()); + } + } + + private static boolean captureExperimentalSpanAttributes() { + return getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false); + } - private static final boolean RECEIVE_TELEMETRY_ENABLED = - ConfigPropertiesUtil.getBoolean( + private static boolean messagingReceiveInstrumentationEnabled() { + if (HAS_INSTRUMENTATION_CONFIG) { + return ExperimentalConfig.get().messagingReceiveInstrumentationEnabled(); + } else { + return ConfigPropertiesUtil.getBoolean( "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false); + } + } - private static final AwsSdkTelemetry TELEMETRY = - AwsSdkTelemetry.builder(GlobalOpenTelemetry.get()) - .setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) - .setMessagingReceiveInstrumentationEnabled(RECEIVE_TELEMETRY_ENABLED) - .setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR) - .setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR) - .build(); + private static boolean useMessagingPropagator() { + return getBoolean( + "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false); + } + + private static boolean recordIndividualHttpError() { + return getBoolean( + "otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false); + } + + private static boolean getBoolean(String name, boolean defaultValue) { + if (HAS_INSTRUMENTATION_CONFIG) { + return InstrumentationConfig.get().getBoolean(name, defaultValue); + } else { + return ConfigPropertiesUtil.getBoolean(name, defaultValue); + } + } public static AwsSdkTelemetry telemetry() { return TELEMETRY; diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractSqsRequest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractSqsRequest.java new file mode 100644 index 000000000000..8413ce772df4 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractSqsRequest.java @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; + +abstract class AbstractSqsRequest { + + public abstract ExecutionAttributes getRequest(); +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java index 0f42c76ee1d4..ef6a72cbceab 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java @@ -5,6 +5,9 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; @@ -18,11 +21,13 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.Function; import javax.annotation.Nullable; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; @@ -63,38 +68,73 @@ final class AwsSdkInstrumenterFactory { Arrays.asList( rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor); - static Instrumenter requestInstrumenter( - OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + private final OpenTelemetry openTelemetry; + @Nullable private final TextMapPropagator messagingPropagator; + private final List capturedHeaders; + private final boolean captureExperimentalSpanAttributes; + private final boolean messagingReceiveInstrumentationEnabled; + private final boolean useXrayPropagator; + AwsSdkInstrumenterFactory( + OpenTelemetry openTelemetry, + @Nullable TextMapPropagator messagingPropagator, + List capturedHeaders, + boolean captureExperimentalSpanAttributes, + boolean messagingReceiveInstrumentationEnabled, + boolean useXrayPropagator) { + this.openTelemetry = openTelemetry; + this.messagingPropagator = messagingPropagator; + this.capturedHeaders = capturedHeaders; + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled; + this.useXrayPropagator = useXrayPropagator; + } + + Instrumenter requestInstrumenter() { return createInstrumenter( openTelemetry, - captureExperimentalSpanAttributes - ? extendedAttributesExtractors - : defaultAttributesExtractors, AwsSdkInstrumenterFactory::spanName, SpanKindExtractor.alwaysClient(), + attributesExtractors(), + emptyList(), true); } - static Instrumenter consumerReceiveInstrumenter( - OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, - boolean messagingReceiveInstrumentationEnabled) { - return sqsInstrumenter( + private List> attributesExtractors() { + return captureExperimentalSpanAttributes + ? extendedAttributesExtractors + : defaultAttributesExtractors; + } + + private List> consumerAttributesExtractors() { + return captureExperimentalSpanAttributes + ? extendedConsumerAttributesExtractors + : defaultConsumerAttributesExtractors; + } + + private AttributesExtractor messagingAttributesExtractor( + MessagingAttributesGetter getter, MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(capturedHeaders) + .build(); + } + + Instrumenter consumerReceiveInstrumenter() { + MessageOperation operation = MessageOperation.RECEIVE; + SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE; + AttributesExtractor messagingAttributeExtractor = + messagingAttributesExtractor(getter, operation); + + return createInstrumenter( openTelemetry, - MessageOperation.RECEIVE, - captureExperimentalSpanAttributes - ? extendedConsumerAttributesExtractors - : defaultConsumerAttributesExtractors, + MessagingSpanNameExtractor.create(getter, operation), + SpanKindExtractor.alwaysConsumer(), + toSqsRequestExtractors(consumerAttributesExtractors(), Function.identity()), + singletonList(messagingAttributeExtractor), messagingReceiveInstrumentationEnabled); } - static Instrumenter consumerProcessInstrumenter( - OpenTelemetry openTelemetry, - TextMapPropagator messagingPropagator, - boolean captureExperimentalSpanAttributes, - boolean messagingReceiveInstrumentationEnabled, - boolean shouldUseXrayPropagator) { + Instrumenter consumerProcessInstrumenter() { MessageOperation operation = MessageOperation.PROCESS; SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE; @@ -104,96 +144,83 @@ static Instrumenter consumerProcessInstrumenter( INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractors( - toProcessRequestExtractors( - captureExperimentalSpanAttributes - ? extendedConsumerAttributesExtractors - : defaultConsumerAttributesExtractors)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder(getter, operation).build()); + toSqsRequestExtractors(consumerAttributesExtractors(), unused -> null)) + .addAttributesExtractor(messagingAttributesExtractor(getter, operation)); if (messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( (spanLinks, parentContext, request) -> { Context extracted = SqsParentContext.ofMessage( - request.getMessage(), messagingPropagator, shouldUseXrayPropagator); + request.getMessage(), messagingPropagator, useXrayPropagator); spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); }); } return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } - private static List> toProcessRequestExtractors( - List> extractors) { - List> result = new ArrayList<>(); + private static + List> toSqsRequestExtractors( + List> extractors, + Function responseConverter) { + List> result = new ArrayList<>(); for (AttributesExtractor extractor : extractors) { result.add( - new AttributesExtractor() { + new AttributesExtractor() { @Override public void onStart( AttributesBuilder attributes, Context parentContext, - SqsProcessRequest sqsProcessRequest) { - extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest()); + AbstractSqsRequest sqsRequest) { + extractor.onStart(attributes, parentContext, sqsRequest.getRequest()); } @Override public void onEnd( AttributesBuilder attributes, Context context, - SqsProcessRequest sqsProcessRequest, - @Nullable Void unused, + AbstractSqsRequest sqsRequest, + @Nullable RESPONSE response, @Nullable Throwable error) { - extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error); + extractor.onEnd( + attributes, + context, + sqsRequest.getRequest(), + responseConverter.apply(response), + error); } }); } return result; } - static Instrumenter producerInstrumenter( - OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { - return sqsInstrumenter( - openTelemetry, - MessageOperation.PUBLISH, - captureExperimentalSpanAttributes - ? extendedAttributesExtractors - : defaultAttributesExtractors, - true); - } - - private static Instrumenter sqsInstrumenter( - OpenTelemetry openTelemetry, - MessageOperation operation, - List> extractors, - boolean enabled) { + Instrumenter producerInstrumenter() { + MessageOperation operation = MessageOperation.PUBLISH; SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE; AttributesExtractor messagingAttributeExtractor = - MessagingAttributesExtractor.builder(getter, operation).build(); - List> newExtractors = - new ArrayList<>(extractors); - newExtractors.add(messagingAttributeExtractor); + messagingAttributesExtractor(getter, operation); return createInstrumenter( openTelemetry, - newExtractors, MessagingSpanNameExtractor.create(getter, operation), - operation == MessageOperation.PUBLISH - ? SpanKindExtractor.alwaysProducer() - : SpanKindExtractor.alwaysConsumer(), - enabled); + SpanKindExtractor.alwaysProducer(), + attributesExtractors(), + singletonList(messagingAttributeExtractor), + true); } - private static Instrumenter createInstrumenter( + private static Instrumenter createInstrumenter( OpenTelemetry openTelemetry, - List> extractors, - SpanNameExtractor spanNameExtractor, - SpanKindExtractor spanKindExtractor, + SpanNameExtractor spanNameExtractor, + SpanKindExtractor spanKindExtractor, + List> attributeExtractors, + List> additionalAttributeExtractors, boolean enabled) { - return Instrumenter.builder( + return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor) - .addAttributesExtractors(extractors) + .addAttributesExtractors(attributeExtractors) + .addAttributesExtractors(additionalAttributeExtractors) .setEnabled(enabled) .buildInstrumenter(spanKindExtractor); } @@ -203,6 +230,4 @@ private static String spanName(ExecutionAttributes attributes) { String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME); return awsServiceName + "." + awsOperation; } - - private AwsSdkInstrumenterFactory() {} } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java index f1e681e97958..afa27630626a 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java @@ -9,6 +9,7 @@ import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; +import java.util.List; import javax.annotation.Nullable; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; @@ -44,7 +45,7 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { } private final Instrumenter requestInstrumenter; - private final Instrumenter consumerReceiveInstrumenter; + private final Instrumenter consumerReceiveInstrumenter; private final Instrumenter consumerProcessInstrumenter; private final Instrumenter producerInstrumenter; private final boolean captureExperimentalSpanAttributes; @@ -54,6 +55,7 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { AwsSdkTelemetry( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean useMessagingPropagator, boolean useXrayPropagator, @@ -62,24 +64,20 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { this.useXrayPropagator = useXrayPropagator; this.messagingPropagator = useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null; - this.requestInstrumenter = - AwsSdkInstrumenterFactory.requestInstrumenter( - openTelemetry, captureExperimentalSpanAttributes); - this.consumerReceiveInstrumenter = - AwsSdkInstrumenterFactory.consumerReceiveInstrumenter( - openTelemetry, - captureExperimentalSpanAttributes, - messagingReceiveInstrumentationEnabled); - this.consumerProcessInstrumenter = - AwsSdkInstrumenterFactory.consumerProcessInstrumenter( + + AwsSdkInstrumenterFactory instrumenterFactory = + new AwsSdkInstrumenterFactory( openTelemetry, messagingPropagator, + capturedHeaders, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled, useXrayPropagator); - this.producerInstrumenter = - AwsSdkInstrumenterFactory.producerInstrumenter( - openTelemetry, captureExperimentalSpanAttributes); + + this.requestInstrumenter = instrumenterFactory.requestInstrumenter(); + this.consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter(); + this.consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter(); + this.producerInstrumenter = instrumenterFactory.producerInstrumenter(); this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; this.recordIndividualHttpError = recordIndividualHttpError; } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java index a4a44ef96c8a..11ddca0ea559 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java @@ -5,14 +5,18 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; +import static java.util.Collections.emptyList; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.OpenTelemetry; +import java.util.List; /** A builder of {@link AwsSdkTelemetry}. */ public final class AwsSdkTelemetryBuilder { private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes; private boolean useMessagingPropagator; private boolean recordIndividualHttpError; @@ -23,6 +27,17 @@ public final class AwsSdkTelemetryBuilder { this.openTelemetry = openTelemetry; } + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + @CanIgnoreReturnValue + public AwsSdkTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + /** * Sets whether experimental attributes should be set to spans. These attributes may be changed or * removed in the future, so only enable this if you know you do not require attributes filled by @@ -104,6 +119,7 @@ public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled( public AwsSdkTelemetry build() { return new AwsSdkTelemetry( openTelemetry, + capturedHeaders, captureExperimentalSpanAttributes, useMessagingPropagator, useXrayPropagator, diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java index 11ba73ab2153..29baec5c6bf1 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java @@ -57,6 +57,11 @@ static String getQueueUrl(SdkRequest request) { return enabled ? SqsImpl.getQueueUrl(request) : null; } + @NoMuzzle + static String getMessageAttribute(SdkRequest request, String name) { + return enabled ? SqsImpl.getMessageAttribute(request, name) : null; + } + @NoMuzzle static String getMessageId(SdkResponse response) { return enabled ? SqsImpl.getMessageId(response) : null; diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java index 4eff71337275..eef9ad4898e4 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAttributesGetter.java @@ -69,7 +69,8 @@ public String getMessageId(ExecutionAttributes request, @Nullable Response respo @Override public List getMessageHeader(ExecutionAttributes request, String name) { - // TODO: not implemented - return Collections.emptyList(); + SdkRequest sdkRequest = request.getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); + String value = SqsAccess.getMessageAttribute(sdkRequest, name); + return value != null ? Collections.singletonList(value) : Collections.emptyList(); } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java index f346fed69a33..cad1cf4a3ee5 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java @@ -68,16 +68,17 @@ static boolean afterReceiveMessageExecution( io.opentelemetry.context.Context parentContext = TracingExecutionInterceptor.getParentContext(executionAttributes); - Instrumenter consumerReceiveInstrumenter = + Instrumenter consumerReceiveInstrumenter = config.getConsumerReceiveInstrumenter(); io.opentelemetry.context.Context receiveContext = null; - if (timer != null - && consumerReceiveInstrumenter.shouldStart(parentContext, executionAttributes)) { + SqsReceiveRequest receiveRequest = + SqsReceiveRequest.create(executionAttributes, SqsMessageImpl.wrap(response.messages())); + if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, receiveRequest)) { receiveContext = InstrumenterUtil.startAndEnd( consumerReceiveInstrumenter, parentContext, - executionAttributes, + receiveRequest, new Response(context.httpResponse(), response), null, timer.startTime(), @@ -258,6 +259,14 @@ static String getQueueUrl(SdkRequest request) { return null; } + static String getMessageAttribute(SdkRequest request, String name) { + if (request instanceof SendMessageRequest) { + MessageAttributeValue value = ((SendMessageRequest) request).messageAttributes().get(name); + return value != null ? value.stringValue() : null; + } + return null; + } + static String getMessageId(SdkResponse response) { if (response instanceof SendMessageResponse) { return ((SendMessageResponse) response).messageId(); diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java index a9a1eb0df725..966f76ec7732 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessage.java @@ -17,4 +17,8 @@ interface SqsMessage { Map messageAttributes(); Map attributesAsStrings(); + + String getMessageAttribute(String name); + + String getMessageId(); } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java index a30a1096f7c4..63f7009d250b 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageImpl.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; @@ -21,6 +23,14 @@ static SqsMessage wrap(Message message) { return new SqsMessageImpl(message); } + static List wrap(List messages) { + List result = new ArrayList<>(); + for (Message message : messages) { + result.add(wrap(message)); + } + return result; + } + @Override public Map messageAttributes() { return message.messageAttributes(); @@ -30,4 +40,15 @@ public Map messageAttributes() { public Map attributesAsStrings() { return message.attributesAsStrings(); } + + @Override + public String getMessageAttribute(String name) { + MessageAttributeValue value = message.messageAttributes().get(name); + return value != null ? value.stringValue() : null; + } + + @Override + public String getMessageId() { + return message.messageId(); + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java index 386b35ba0ac5..a2bc70295a55 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequest.java @@ -7,7 +7,7 @@ import software.amazon.awssdk.core.interceptor.ExecutionAttributes; -final class SqsProcessRequest { +final class SqsProcessRequest extends AbstractSqsRequest { private final ExecutionAttributes request; private final SqsMessage message; @@ -20,6 +20,7 @@ public static SqsProcessRequest create(ExecutionAttributes request, SqsMessage m return new SqsProcessRequest(request, message); } + @Override public ExecutionAttributes getRequest() { return request; } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java index 36470507cef4..e1f0ebc774d8 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsProcessRequestAttributesGetter.java @@ -6,6 +6,8 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; import software.amazon.awssdk.core.SdkRequest; @@ -58,6 +60,12 @@ public Long getMessagePayloadCompressedSize(SqsProcessRequest request) { @Override @Nullable public String getMessageId(SqsProcessRequest request, @Nullable Void response) { - return null; + return request.getMessage().getMessageId(); + } + + @Override + public List getMessageHeader(SqsProcessRequest request, String name) { + String value = request.getMessage().getMessageAttribute(name); + return value != null ? Collections.singletonList(value) : Collections.emptyList(); } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequest.java new file mode 100644 index 000000000000..4d1ef63accb2 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import java.util.List; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; + +final class SqsReceiveRequest extends AbstractSqsRequest { + private final ExecutionAttributes request; + private final List messages; + + private SqsReceiveRequest(ExecutionAttributes request, List messages) { + this.request = request; + this.messages = messages; + } + + public static SqsReceiveRequest create(ExecutionAttributes request, List messages) { + return new SqsReceiveRequest(request, messages); + } + + @Override + public ExecutionAttributes getRequest() { + return request; + } + + public List getMessages() { + return messages; + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequestAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequestAttributesGetter.java new file mode 100644 index 000000000000..57b62d53525b --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveRequestAttributesGetter.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import software.amazon.awssdk.core.SdkRequest; + +enum SqsReceiveRequestAttributesGetter + implements MessagingAttributesGetter { + INSTANCE; + + @Override + public String getSystem(SqsReceiveRequest request) { + return "AmazonSQS"; + } + + @Override + public String getDestination(SqsReceiveRequest request) { + SdkRequest sdkRequest = + request.getRequest().getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE); + String queueUrl = SqsAccess.getQueueUrl(sdkRequest); + if (queueUrl != null) { + int i = queueUrl.lastIndexOf('/'); + if (i > 0) { + return queueUrl.substring(i + 1); + } + } + return null; + } + + @Override + public boolean isTemporaryDestination(SqsReceiveRequest request) { + return false; + } + + @Override + @Nullable + public String getConversationId(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadSize(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadCompressedSize(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public String getMessageId(SqsReceiveRequest request, @Nullable Response response) { + return null; + } + + @Override + public List getMessageHeader(SqsReceiveRequest request, String name) { + return StreamSupport.stream(request.getMessages().spliterator(), false) + .map(message -> message.getMessageAttribute(name)) + .filter(value -> value != null) + .collect(Collectors.toList()); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java index ce25989193bd..f717b1efc44c 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java @@ -64,7 +64,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".TracingMessages"); private final Instrumenter requestInstrumenter; - private final Instrumenter consumerReceiveInstrumenter; + private final Instrumenter consumerReceiveInstrumenter; private final Instrumenter consumerProcessInstrumenter; private final Instrumenter producerInstrumenter; private final boolean captureExperimentalSpanAttributes; @@ -73,7 +73,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { AttributeKey.stringKey("aws.http.error_message"); static final String HTTP_FAILURE_EVENT = "HTTP request failure"; - Instrumenter getConsumerReceiveInstrumenter() { + Instrumenter getConsumerReceiveInstrumenter() { return consumerReceiveInstrumenter; } @@ -97,7 +97,7 @@ boolean shouldUseXrayPropagator() { TracingExecutionInterceptor( Instrumenter requestInstrumenter, - Instrumenter consumerReceiveInstrumenter, + Instrumenter consumerReceiveInstrumenter, Instrumenter consumerProcessInstrumenter, Instrumenter producerInstrumenter, boolean captureExperimentalSpanAttributes, diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy index 19391ce280cd..2e5d73a8427b 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy @@ -10,6 +10,8 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.SqsClient +import static java.util.Collections.singletonList + abstract class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTestTrait { static AwsSdkTelemetry telemetry @@ -17,6 +19,8 @@ abstract class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry()) .setCaptureExperimentalSpanAttributes(true) .setMessagingReceiveInstrumentationEnabled(true) + .setCapturedHeaders(singletonList("test-message-header")) + configure(telemetryBuilder) telemetry = telemetryBuilder.build() } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy index fbd4bb5b707c..575ecf161c50 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy @@ -181,6 +181,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } } } @@ -357,6 +358,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } } } @@ -384,6 +386,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy index 246e3824dbbe..41ac5d129676 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy @@ -115,7 +115,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { } } - void assertSqsTraces(withParent = false) { + void assertSqsTraces(withParent = false, captureHeaders = false) { assertTraces(3) { SpanData publishSpan trace(0, 1) { @@ -164,6 +164,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + if (captureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } publishSpan = span(0) @@ -225,6 +228,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.MESSAGING_OPERATION" "receive" "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + if (captureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } span(1 + offset) { @@ -244,7 +250,11 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + if (captureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } span(2 + offset) { @@ -276,6 +286,31 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { assertSqsTraces() } + def "capture message header as span attribute"() { + setup: + def builder = SqsClient.builder() + configureSdkClient(builder) + def client = configureSqsClient(builder.build()) + + client.createQueue(createQueueRequest) + + when: + SendMessageRequest newSendMessageRequest = sendMessageRequest.toBuilder().messageAttributes( + Collections.singletonMap("test-message-header", + MessageAttributeValue.builder().dataType("String").stringValue("test").build()) + ).build() + client.sendMessage(newSendMessageRequest) + + ReceiveMessageRequest newReceiveMessageRequest = receiveMessageRequest.toBuilder() + .messageAttributeNames("test-message-header").build() + def resp = client.receiveMessage(newReceiveMessageRequest) + + then: + resp.messages.size() == 1 + resp.messages.each {message -> runWithSpan("process child") {}} + assertSqsTraces(false, true) + } + def "simple sqs producer-consumer services with parent: sync"() { setup: def builder = SqsClient.builder() @@ -420,6 +455,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } } }