Skip to content

Commit

Permalink
Add logs to find the issue at root
Browse files Browse the repository at this point in the history
  • Loading branch information
sakthivelmanii committed Jan 6, 2025
1 parent 21733d5 commit 7a96270
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,14 @@ public void run() {
// Non-streaming result sets do not trigger this callback, and for those result sets, we
// need to eagerly start the ProduceRowsRunnable.
synchronized (monitor) {
if (state == State.STREAMING_IN_PROGRESS
|| state == State.RUNNING
|| state == State.CONSUMING) {
return;
}
if (state == State.STREAMING_INITIALIZED) {
state = State.STREAMING_IN_PROGRESS;
}
// if (state == State.STREAMING_IN_PROGRESS
// || state == State.RUNNING
// || state == State.CONSUMING) {
// return;
// }
// if (state == State.STREAMING_INITIALIZED) {
// state = State.STREAMING_IN_PROGRESS;
// }
if (!initiateStreaming(AsyncResultSetImpl.this)) {
initiateProduceRows();
}
Expand All @@ -492,6 +492,7 @@ public void run() {
/** Sets the callback for this {@link AsyncResultSet}. */
@Override
public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
System.out.println("Inside setCallback");
synchronized (monitor) {
Preconditions.checkState(!closed, "This AsyncResultSet has been closed");
Preconditions.checkState(
Expand Down Expand Up @@ -660,7 +661,7 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF
!partialResultSet.getResumeToken().isEmpty()
|| bufferIsFull
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread || state != State.STREAMING_IN_PROGRESS) {
if (startJobThread || state != State.STREAMING_INITIALIZED) {
initiateProduceRows();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ private ByteString generateTransactionName(String session) {
counter = new AtomicLong();
transactionCounters.put(session, counter);
}
System.out.printf("Generating session using Session ID %s\n", session);
return ByteString.copyFromUtf8(
String.format("%s/transactions/%d", session, counter.incrementAndGet()));
}
Expand Down Expand Up @@ -1999,6 +2000,7 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction
if (index > -1) {
long id = Long.parseLong(transactionId.toStringUtf8().substring(index + 1));
if (id != counter.get()) {
System.out.printf("Session ID %s TransactionId %s\n", session.getName(), transactionId.toStringUtf8());
throw Status.FAILED_PRECONDITION
.withDescription(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,8 @@ private void asyncRunner_withReadFunction(
runner.runAsync(
txn -> {
AsyncResultSet rs = readFunction.apply(txn);
System.out.println(rs);
System.out.println("Creating a new AsyncResultSet");
ApiFuture<Void> fut =
rs.setCallback(
queryExecutor,
Expand All @@ -1450,6 +1452,7 @@ private void asyncRunner_withReadFunction(
}
}
});
System.out.println("After setCallback");
return ApiFutures.transform(
fut, input -> counter.get(), MoreExecutors.directExecutor());
},
Expand Down Expand Up @@ -1573,6 +1576,8 @@ private void asyncTransactionManager_readAsync(
context.then(
(transaction, ignored) -> {
AsyncResultSet rs = fn.apply(transaction);
System.out.println(rs);
System.out.println("1. Creating a new AsyncResultSet");
ApiFuture<Void> fut =
rs.setCallback(
queryExecutor,
Expand Down

0 comments on commit 7a96270

Please sign in to comment.