Skip to content

Commit

Permalink
feat: Improve tracing by adding attributes (#3576)
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 authored Jan 3, 2025
1 parent 1af8e46 commit eee333b
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ ResultSet readInternalWithOptions(
SpannerImpl.READ,
span,
tracer,
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private final TraceWrapper tracer;
private Attributes commonAttributes;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
Expand All @@ -50,7 +52,8 @@ class DatabaseClientImpl implements DatabaseClient {
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
/* useMultiplexedSessionForRW = */ false,
Attributes.empty());
}

@VisibleForTesting
Expand All @@ -62,7 +65,8 @@ class DatabaseClientImpl implements DatabaseClient {
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
/* useMultiplexedSessionForRW = */ false,
Attributes.empty());
}

DatabaseClientImpl(
Expand All @@ -72,14 +76,16 @@ class DatabaseClientImpl implements DatabaseClient {
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW) {
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;
}

@VisibleForTesting
Expand Down Expand Up @@ -138,7 +144,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
Expand All @@ -161,7 +167,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
Expand All @@ -181,7 +187,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
} catch (RuntimeException e) {
Expand All @@ -194,7 +200,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(

@Override
public ReadContext singleUse() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse();
} catch (RuntimeException e) {
Expand All @@ -206,7 +212,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUse(bound);
} catch (RuntimeException e) {
Expand All @@ -218,7 +224,7 @@ public ReadContext singleUse(TimestampBound bound) {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -230,7 +236,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -242,7 +248,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {

@Override
public ReadOnlyTransaction readOnlyTransaction() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction();
} catch (RuntimeException e) {
Expand All @@ -254,7 +260,7 @@ public ReadOnlyTransaction readOnlyTransaction() {

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
Expand All @@ -266,7 +272,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().readWriteTransaction(options);
} catch (RuntimeException e) {
Expand All @@ -278,7 +284,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {

@Override
public TransactionManager transactionManager(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManager(options);
} catch (RuntimeException e) {
Expand All @@ -290,7 +296,7 @@ public TransactionManager transactionManager(TransactionOption... options) {

@Override
public AsyncRunner runAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().runAsync(options);
} catch (RuntimeException e) {
Expand All @@ -302,7 +308,7 @@ public AsyncRunner runAsync(TransactionOption... options) {

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
return getMultiplexedSessionForRW().transactionManagerAsync(options);
} catch (RuntimeException e) {
Expand All @@ -322,7 +328,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -125,7 +126,8 @@ private BatchCreateSessionsRunnable(
public void run() {
List<SessionImpl> sessions;
int remainingSessionsToCreate = sessionCount;
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS);
ISpan span =
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
spanner
.getTracer()
Expand Down Expand Up @@ -170,6 +172,7 @@ interface SessionConsumer {
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;
private final Attributes commonAttributes;

@GuardedBy("this")
private volatile long sessionChannelCounter;
Expand All @@ -182,6 +185,7 @@ interface SessionConsumer {
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
}

@Override
Expand All @@ -205,7 +209,7 @@ SessionImpl createSession() {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION);
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down Expand Up @@ -250,7 +254,10 @@ void createMultiplexedSession(SessionConsumer consumer) {
* GRPC channel. In case of an error during the gRPC calls, an exception will be thrown.
*/
SessionImpl createMultiplexedSession() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
ISpan span =
spanner
.getTracer()
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
multiplexedSessionDatabaseClient,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(),
useMultiplexedSessionForRW);
useMultiplexedSessionForRW,
this.tracer.createCommonAttributes(db));
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -329,15 +330,17 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean useMultiplexedSessionPartitionedOps,
boolean useMultiplexedSessionForRW) {
boolean useMultiplexedSessionForRW,
Attributes commonAttributes) {
return new DatabaseClientImpl(
clientId,
pool,
useMultiplexedSessionBlindWrite,
multiplexedSessionClient,
useMultiplexedSessionPartitionedOps,
tracer,
useMultiplexedSessionForRW);
useMultiplexedSessionForRW,
commonAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ class TraceWrapper {
AttributeKey.stringKey("transaction.tag");
private static final AttributeKey<String> STATEMENT_TAG_KEY =
AttributeKey.stringKey("statement.tag");
private static final AttributeKey<String> INSTANCE_NAME_KEY =
AttributeKey.stringKey("instance.name");
private static final AttributeKey<String> DB_NAME_KEY = AttributeKey.stringKey("db.name");
private static final AttributeKey<String> DB_STATEMENT_KEY =
AttributeKey.stringKey("db.statement");
private static final AttributeKey<List<String>> DB_STATEMENT_ARRAY_KEY =
AttributeKey.stringArrayKey("db.statement");
private static final AttributeKey<String> DB_TABLE_NAME_KEY = AttributeKey.stringKey("db.table");
private static final AttributeKey<String> THREAD_NAME_KEY = AttributeKey.stringKey("thread.name");

private final Tracer openCensusTracer;
Expand All @@ -61,8 +65,8 @@ ISpan spanBuilder(String spanName) {
return spanBuilder(spanName, Attributes.empty());
}

ISpan spanBuilder(String spanName, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(options));
ISpan spanBuilder(String spanName, Attributes commonAttributes, TransactionOption... options) {
return spanBuilder(spanName, createTransactionAttributes(commonAttributes, options));
}

ISpan spanBuilder(String spanName, Attributes attributes) {
Expand Down Expand Up @@ -137,18 +141,20 @@ IScope withSpan(ISpan span) {
}
}

Attributes createTransactionAttributes(TransactionOption... options) {
Attributes createTransactionAttributes(
Attributes commonAttributes, TransactionOption... options) {
AttributesBuilder builder = commonAttributes.toBuilder();
if (options != null && options.length > 0) {
Optional<TagOption> tagOption =
Arrays.stream(options)
.filter(option -> option instanceof TagOption)
.map(option -> (TagOption) option)
.findAny();
if (tagOption.isPresent()) {
return Attributes.of(TRANSACTION_TAG_KEY, tagOption.get().getTag());
builder.put(TRANSACTION_TAG_KEY, tagOption.get().getTag());
}
}
return Attributes.empty();
return builder.build();
}

Attributes createStatementAttributes(Statement statement, Options options) {
Expand Down Expand Up @@ -185,6 +191,22 @@ Attributes createStatementBatchAttributes(Iterable<Statement> statements, Option
return Attributes.empty();
}

Attributes createTableAttributes(String tableName, Options options) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_TABLE_NAME_KEY, tableName);
if (options != null && options.hasTag()) {
builder.put(STATEMENT_TAG_KEY, options.tag());
}
return builder.build();
}

Attributes createCommonAttributes(DatabaseId db) {
AttributesBuilder builder = Attributes.builder();
builder.put(DB_NAME_KEY, db.getDatabase());
builder.put(INSTANCE_NAME_KEY, db.getInstanceId().getInstance());
return builder.build();
}

private static String getTraceThreadName() {
return MoreObjects.firstNonNull(
Context.current().get(OpenTelemetryContextKeys.THREAD_NAME_KEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
import com.google.cloud.spanner.testing.RemoteSpannerHelper;
import io.opentelemetry.api.common.Attributes;

/**
* Subclass of {@link IntegrationTestEnv} that allows the user to specify when the underlying
Expand Down Expand Up @@ -52,7 +53,8 @@ DatabaseClientImpl createDatabaseClient(
boolean useMultiplexedSessionBlindWriteIgnore,
MultiplexedSessionDatabaseClient ignore,
boolean useMultiplexedSessionPartitionedOpsIgnore,
boolean useMultiplexedSessionForRWIgnore) {
boolean useMultiplexedSessionForRWIgnore,
Attributes attributes) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Loading

0 comments on commit eee333b

Please sign in to comment.