Skip to content

Commit

Permalink
chore(spanner): fix unit tests for mux rw (#3577)
Browse files Browse the repository at this point in the history
* chore(Spanner): fix tests for mux rw

* chore(Spanner): fix tests

* chore(spanner): fix tests
  • Loading branch information
harshachinta authored Jan 3, 2025
1 parent eee333b commit c74fe83
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) {
// initiate a begin transaction request to verify if read-write transactions are
// supported using multiplexed sessions.
if (sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()) {
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()
&& !sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getSkipVerifyBeginTransactionForMuxRW()) {
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class SessionPoolOptions {

// TODO: Change to use java.time.Duration.
private final Duration multiplexedSessionMaintenanceDuration;
private final boolean skipVerifyingBeginTransactionForMuxRW;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand Down Expand Up @@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) {
? useMultiplexedSessionFromEnvVariablePartitionedOps
: builder.useMultiplexedSessionPartitionedOps;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW;
}

@Override
Expand Down Expand Up @@ -169,8 +171,10 @@ public boolean equals(Object o) {
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
&& Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW)
&& Objects.equals(
this.multiplexedSessionMaintenanceDuration,
other.multiplexedSessionMaintenanceDuration);
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
&& Objects.equals(
this.skipVerifyingBeginTransactionForMuxRW,
other.skipVerifyingBeginTransactionForMuxRW);
}

@Override
Expand Down Expand Up @@ -199,7 +203,8 @@ public int hashCode() {
this.poolMaintainerClock,
this.useMultiplexedSession,
this.useMultiplexedSessionForRW,
this.multiplexedSessionMaintenanceDuration);
this.multiplexedSessionMaintenanceDuration,
this.skipVerifyingBeginTransactionForMuxRW);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() {
return multiplexedSessionMaintenanceDuration;
}

@VisibleForTesting
@InternalApi
boolean getSkipVerifyBeginTransactionForMuxRW() {
return skipVerifyingBeginTransactionForMuxRW;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -607,6 +618,7 @@ public static class Builder {

private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Clock poolMaintainerClock = Clock.INSTANCE;
private boolean skipVerifyingBeginTransactionForMuxRW = false;

private static Position getReleaseToPositionFromSystemProperty() {
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
Expand Down Expand Up @@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) {
this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW;
}

/**
Expand Down Expand Up @@ -872,6 +885,18 @@ Builder setMultiplexedSessionMaintenanceDuration(
return this;
}

// The additional BeginTransaction RPC for multiplexed session read-write is causing
// unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC.
// Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed
// session is created for the first time during client initialization.
// This is only used for tests.
@VisibleForTesting
Builder setSkipVerifyingBeginTransactionForMuxRW(
boolean skipVerifyingBeginTransactionForMuxRW) {
this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW;
return this;
}

/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ public void tearDown() {
@Test
public void
testPoolMaintainer_whenInactiveTransactionAndSessionIsNotFoundOnBackend_removeSessionsFromPool() {
assumeFalse(
"Session pool maintainer test skipped for multiplexed sessions",
isMultiplexedSessionsEnabledForRW());
FakeClock poolMaintainerClock = new FakeClock();
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
InactiveTransactionRemovalOptions.newBuilder()
Expand Down Expand Up @@ -347,6 +350,9 @@ public void tearDown() {
@Test
public void
testPoolMaintainer_whenInactiveTransactionAndSessionExistsOnBackend_removeSessionsFromPool() {
assumeFalse(
"Session leaks tests are skipped for multiplexed sessions",
isMultiplexedSessionsEnabledForRW());
FakeClock poolMaintainerClock = new FakeClock();
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
InactiveTransactionRemovalOptions.newBuilder()
Expand Down Expand Up @@ -482,6 +488,9 @@ public void testPoolMaintainer_whenLongRunningPartitionedUpdateRequest_takeNoAct
*/
@Test
public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessionsFromPool() {
assumeFalse(
"Session leaks tests are skipped for multiplexed sessions",
isMultiplexedSessionsEnabledForRW());
FakeClock poolMaintainerClock = new FakeClock();
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
InactiveTransactionRemovalOptions.newBuilder()
Expand Down Expand Up @@ -3085,6 +3094,7 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)));
// No additional requests should have been sent by the client.
// Note that in case of the use of multiplexed sessions, then we have 2 requests:
// Note that in case of the use of regular sessions, then we have 1 request:
// 1. BatchCreateSessions for the session pool.
// 2. CreateSession for the multiplexed session.
assertThat(mockSpanner.getRequests())
Expand Down Expand Up @@ -3211,9 +3221,16 @@ public void testDatabaseOrInstanceIsDeletedAndThenRecreated() throws Exception {
ResourceNotFoundException.class, () -> dbClient.singleUse().executeQuery(SELECT1));
}

assertThrows(
ResourceNotFoundException.class,
() -> dbClient.readWriteTransaction().run(transaction -> null));
if (!spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
// We only verify this for read-write transactions if we are not using multiplexed
// sessions. For multiplexed sessions, we don't need any special handling, as deleting the
// database will also invalidate the multiplexed session, and trying to continue to use it
// will continue to return an error.
assertThrows(
ResourceNotFoundException.class,
() -> dbClient.readWriteTransaction().run(transaction -> null));
}

assertThat(mockSpanner.getRequests()).isEmpty();
// Now get a new database client. Normally multiple calls to Spanner#getDatabaseClient will
// return the same instance, but not when the instance has been invalidated by a
Expand Down Expand Up @@ -3300,13 +3317,18 @@ public void testAllowNestedTransactions() throws InterruptedException {
Thread.sleep(1L);
}
assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
int expectedMinSessions =
spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()
? minSessions
: minSessions - 1;
Long res =
client
.readWriteTransaction()
.allowNestedTransaction()
.run(
transaction -> {
assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1);
assertThat(client.pool.getNumberOfSessionsInPool())
.isEqualTo(expectedMinSessions);
return transaction.executeUpdate(UPDATE_STATEMENT);
});
assertThat(res).isEqualTo(UPDATE_COUNT);
Expand All @@ -3333,6 +3355,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
}
assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
// When read-write transaction uses multiplexed sessions, then sessions are not checked out from
// the session pool.
int expectedMinSessions = isMultiplexedSessionsEnabledForRW() ? minSessions : minSessions - 1;
Long res =
client1
.readWriteTransaction()
Expand All @@ -3341,7 +3366,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
transaction -> {
// Client1 should have 1 session checked out.
// Client2 should have 0 sessions checked out.
assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1);
assertThat(client1.pool.getNumberOfSessionsInPool())
.isEqualTo(expectedMinSessions);
assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions);
Long add =
client2
Expand All @@ -3350,9 +3376,9 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
transaction1 -> {
// Both clients should now have 1 session checked out.
assertThat(client1.pool.getNumberOfSessionsInPool())
.isEqualTo(minSessions - 1);
.isEqualTo(expectedMinSessions);
assertThat(client2.pool.getNumberOfSessionsInPool())
.isEqualTo(minSessions - 1);
.isEqualTo(expectedMinSessions);
try (ResultSet rs = transaction1.executeQuery(SELECT1)) {
if (rs.next()) {
return rs.getLong(0);
Expand Down Expand Up @@ -5090,6 +5116,9 @@ public void testRetryOnResourceExhausted() {

@Test
public void testSessionPoolExhaustedError_containsStackTraces() {
assumeFalse(
"Session pool tests are skipped for multiplexed sessions",
isMultiplexedSessionsEnabledForRW());
try (Spanner spanner =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
Expand Down Expand Up @@ -5450,4 +5479,11 @@ private boolean isMultiplexedSessionsEnabled() {
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}

private boolean isMultiplexedSessionsEnabledForRW() {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public void setUp() {
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setTrackTransactionStarter()
// The extra BeginTransaction RPC for multiplexed session read-write is causing
// unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore,
// this is being skipped.
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
.build()
.getService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -1829,7 +1830,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
transactionId = null;
break;
case BEGIN:
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
transactionId = beginTransaction(session, tx.getBegin(), null, null).getId();
break;
case ID:
Transaction transaction = transactions.get(tx.getId());
Expand Down Expand Up @@ -1895,7 +1896,8 @@ public void beginTransaction(
beginTransactionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction =
beginTransaction(session, request.getOptions(), request.getMutationKey());
beginTransaction(
session, request.getOptions(), request.getMutationKey(), request.getRequestOptions());
responseObserver.onNext(transaction);
responseObserver.onCompleted();
} catch (StatusRuntimeException t) {
Expand All @@ -1906,7 +1908,10 @@ public void beginTransaction(
}

private Transaction beginTransaction(
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
Session session,
TransactionOptions options,
com.google.spanner.v1.Mutation mutationKey,
RequestOptions requestOptions) {
ByteString transactionId = generateTransactionName(session.getName());
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
Expand All @@ -1920,12 +1925,17 @@ private Transaction beginTransaction(
}
Transaction transaction = builder.build();
transactions.put(transaction.getId(), transaction);
transactionsStarted.add(transaction.getId());
// TODO: remove once UNIMPLEMENTED error is not thrown for read-write mux
// Do not consider the transaction if this request was from background thread
if (requestOptions == null
|| !requestOptions.getTransactionTag().equals("multiplexed-rw-background-begin-txn")) {
transactionsStarted.add(transaction.getId());
if (abortNextTransaction.getAndSet(false)) {
markAbortedTransaction(transaction.getId());
}
}
isPartitionedDmlTransaction.put(
transaction.getId(), options.getModeCase() == ModeCase.PARTITIONED_DML);
if (abortNextTransaction.getAndSet(false)) {
markAbortedTransaction(transaction.getId());
}
return transaction;
}

Expand Down Expand Up @@ -2025,7 +2035,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
TransactionOptions.newBuilder()
.setReadWrite(ReadWrite.getDefaultInstance())
.build(),
null);
null,
request.getRequestOptions());
} else if (request.getTransactionId() != null) {
transaction = transactions.get(request.getTransactionId());
Optional<Boolean> aborted =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void createSpannerInstance() {
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
.setFailOnSessionLeak()
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
.setEnableApiTracing(true)
.build()
Expand Down Expand Up @@ -428,6 +429,7 @@ public boolean isEnableApiTracing() {
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
.setFailOnSessionLeak()
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
.build()
.getService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void createSpannerInstance() {
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
.setFailOnSessionLeak()
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
// Setting this to false so that Spanner Options does not register Metrics Tracer
// factory again.
Expand Down
Loading

0 comments on commit c74fe83

Please sign in to comment.