From 7a96270f44be66bf00f2cca62d810c23444537d9 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 6 Jan 2025 19:55:49 +0530 Subject: [PATCH] Add logs to find the issue at root --- .../cloud/spanner/AsyncResultSetImpl.java | 19 ++++++++++--------- .../cloud/spanner/MockSpannerServiceImpl.java | 2 ++ .../RetryOnInvalidatedSessionTest.java | 5 +++++ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 26b4272fe36..c68b75575b0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -470,14 +470,14 @@ public void run() { // Non-streaming result sets do not trigger this callback, and for those result sets, we // need to eagerly start the ProduceRowsRunnable. synchronized (monitor) { - if (state == State.STREAMING_IN_PROGRESS - || state == State.RUNNING - || state == State.CONSUMING) { - return; - } - if (state == State.STREAMING_INITIALIZED) { - state = State.STREAMING_IN_PROGRESS; - } +// if (state == State.STREAMING_IN_PROGRESS +// || state == State.RUNNING +// || state == State.CONSUMING) { +// return; +// } +// if (state == State.STREAMING_INITIALIZED) { +// state = State.STREAMING_IN_PROGRESS; +// } if (!initiateStreaming(AsyncResultSetImpl.this)) { initiateProduceRows(); } @@ -492,6 +492,7 @@ public void run() { /** Sets the callback for this {@link AsyncResultSet}. */ @Override public ApiFuture setCallback(Executor exec, ReadyCallback cb) { + System.out.println("Inside setCallback"); synchronized (monitor) { Preconditions.checkState(!closed, "This AsyncResultSet has been closed"); Preconditions.checkState( @@ -660,7 +661,7 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF !partialResultSet.getResumeToken().isEmpty() || bufferIsFull || partialResultSet == GrpcStreamIterator.END_OF_STREAM; - if (startJobThread || state != State.STREAMING_IN_PROGRESS) { + if (startJobThread || state != State.STREAMING_INITIALIZED) { initiateProduceRows(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f27b28d323..17cbfa87e29 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -651,6 +651,7 @@ private ByteString generateTransactionName(String session) { counter = new AtomicLong(); transactionCounters.put(session, counter); } + System.out.printf("Generating session using Session ID %s\n", session); return ByteString.copyFromUtf8( String.format("%s/transactions/%d", session, counter.incrementAndGet())); } @@ -1999,6 +2000,7 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction if (index > -1) { long id = Long.parseLong(transactionId.toStringUtf8().substring(index + 1)); if (id != counter.get()) { + System.out.printf("Session ID %s TransactionId %s\n", session.getName(), transactionId.toStringUtf8()); throw Status.FAILED_PRECONDITION .withDescription( String.format( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 5496ad531bf..bc5f69f15b7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -1434,6 +1434,8 @@ private void asyncRunner_withReadFunction( runner.runAsync( txn -> { AsyncResultSet rs = readFunction.apply(txn); + System.out.println(rs); + System.out.println("Creating a new AsyncResultSet"); ApiFuture fut = rs.setCallback( queryExecutor, @@ -1450,6 +1452,7 @@ private void asyncRunner_withReadFunction( } } }); + System.out.println("After setCallback"); return ApiFutures.transform( fut, input -> counter.get(), MoreExecutors.directExecutor()); }, @@ -1573,6 +1576,8 @@ private void asyncTransactionManager_readAsync( context.then( (transaction, ignored) -> { AsyncResultSet rs = fn.apply(transaction); + System.out.println(rs); + System.out.println("1. Creating a new AsyncResultSet"); ApiFuture fut = rs.setCallback( queryExecutor,