Skip to content

Commit

Permalink
Merge branch 'googleapis:main' into interval_support
Browse files Browse the repository at this point in the history
  • Loading branch information
sagarwaal authored and Sagar Agarwal committed Oct 22, 2024
2 parents df6ebe7 + 16cc6ee commit f9517a0
Show file tree
Hide file tree
Showing 28 changed files with 843 additions and 55 deletions.
6 changes: 0 additions & 6 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,6 @@
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>24.1.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

private ResultSet readInternal(
String table,
@Nullable String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);

/**
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
* will be included if the read-write transaction is executed on a multiplexed session.
*/
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}

static final class LazyByteArray implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
Expand Down Expand Up @@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
Expand Down Expand Up @@ -83,6 +84,7 @@ Map<String, String> createClientAttributes(String projectId, String client_name)
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
// TODO: Replace this with real value.
clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void addAttributes(Map<String, String> attributes) {
for (ApiTracer child : children) {
if (child instanceof MetricsTracer) {
MetricsTracer metricsTracer = (MetricsTracer) child;
attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value));
metricsTracer.addAttributes(attributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer,
false);
/* useMultiplexedSessionForRW = */ false);
}

DatabaseClientImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
Expand All @@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// collect the precommit token from each PartialResultSet
if (current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
}
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
static TransactionOptions createReadWriteTransactionOptions(
Options options, ByteString previousTransactionId) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
Expand All @@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
if (previousTransactionId != null
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -427,13 +432,17 @@ public void close() {
}

ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
Expand Down Expand Up @@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
return res;
}

TransactionContextImpl newTransaction(Options options) {
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
.setTransactionId(null)
.setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1726,14 +1726,13 @@ private ApiTracerFactory getDefaultApiTracerFactory() {
private ApiTracerFactory createMetricsApiTracerFactory() {
OpenTelemetry openTelemetry =
this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
getDefaultProjectId(), getCredentials());
this.getProjectId(), getCredentials());

return openTelemetry != null
? new MetricsTracerFactory(
new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
builtInOpenTelemetryMetricsProvider.createClientAttributes(
getDefaultProjectId(),
"spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;

/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options);
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
Expand Down Expand Up @@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}
txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (!useInlinedBegin) {
txn.ensureTxn();
}
Expand Down
Loading

0 comments on commit f9517a0

Please sign in to comment.