From 9758bd79e4cf487e842b5aec4810e96cbb0fa39d Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Tue, 7 Jan 2025 01:36:14 +0530 Subject: [PATCH] verify fix with adding latency --- .../cloud/spanner/AsyncResultSetImpl.java | 5 ++++ .../spanner/ResumableStreamIterator.java | 24 +++++++++---------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index b23bac8ea7..680c9afe48 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -516,6 +516,11 @@ private void initiateProduceRows() { } produceRowsInitiated = true; } + try { + Thread.sleep(5L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } this.service.execute(new ProduceRowsRunnable()); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 347af0d867..b46fd599f1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -44,7 +44,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -71,8 +70,7 @@ abstract class ResumableStreamIterator extends AbstractIterator> streamCache = - new AtomicReference<>(); + private volatile CloseableIterator stream; private ByteString resumeToken; private boolean finished; /** @@ -215,16 +213,16 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() { @Override public void close(@Nullable String message) { - if (streamCache.get() != null) { - streamCache.get().close(message); + if (stream != null) { + stream.close(message); span.end(); - streamCache.set(null); + stream = null; } } @Override public boolean isWithBeginTransaction() { - return streamCache.get() != null && streamCache.get().isWithBeginTransaction(); + return stream != null && stream.isWithBeginTransaction(); } @Override @@ -248,8 +246,8 @@ protected PartialResultSet computeNext() { return buffer.pop(); } try { - if (streamCache.get().hasNext()) { - PartialResultSet next = streamCache.get().next(); + if (stream.hasNext()) { + PartialResultSet next = stream.next(); boolean hasResumeToken = !next.getResumeToken().isEmpty(); if (hasResumeToken) { resumeToken = next.getResumeToken(); @@ -283,7 +281,7 @@ protected PartialResultSet computeNext() { buffer.removeLast(); } assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken); - streamCache.set(null); + stream = null; try (IScope s = tracer.withSpan(span)) { long delay = spannerException.getRetryDelayInMillis(); if (delay != -1) { @@ -304,7 +302,7 @@ protected PartialResultSet computeNext() { if (translated instanceof RetryOnDifferentGrpcChannelException) { if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts() && prepareIteratorForRetryOnDifferentGrpcChannel()) { - streamCache.set(null); + stream = null; continue; } } @@ -325,7 +323,7 @@ private void startGrpcStreaming() { System.out.printf( "[%s][%s] Inside startGrpcStreaming\n", OffsetDateTime.now(), Thread.currentThread().getName()); - if (streamCache.get() == null) { + if (stream == null) { span.addAnnotation( "Starting/Resuming stream", "ResumeToken", @@ -336,7 +334,7 @@ private void startGrpcStreaming() { System.out.printf( "[%s][%s] Inside creating stream\n", OffsetDateTime.now(), Thread.currentThread().getName()); - streamCache.set(checkNotNull(startStream(resumeToken, streamMessageListener))); + stream = checkNotNull(startStream(resumeToken, streamMessageListener)); } } // }