Skip to content

Commit

Permalink
Extract Database.runTransaction logic for retry, then to properly end…
Browse files Browse the repository at this point in the history
… span
  • Loading branch information
odeke-em committed Oct 18, 2024
1 parent dbea104 commit 7bdc173
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3212,6 +3212,24 @@ class Database extends common.GrpcServiceObject {
? (optionsOrRunFn as RunTransactionOptions)
: {};

const retry = (span: Span) => {
this.runTransaction(options, (err, txn) => {
if (err) {
setSpanError(span, err);
runFn!(err, null);
return;
}

txn!.once('end', () => {
span.end();
});
txn!.once('error', () => {
span.end();
});
runFn!(null, txn!);
});
};

startTrace('Database.runTransaction', this._traceConfig, span => {
this.pool_.getSession(async (err, session?, transaction?) => {
if (err) {
Expand All @@ -3222,11 +3240,7 @@ class Database extends common.GrpcServiceObject {
span.addEvent('No session available', {
'session.id': session?.id,
});
// In this case we are invoking runTransaction afresh
// hence we have to wait for this call to complete before
// ending the span.
await this.runTransaction(options, runFn!);
span.end();
retry(span);
return;
}

Expand All @@ -3246,24 +3260,26 @@ class Database extends common.GrpcServiceObject {
transaction!.excludeTxnFromChangeStreams();
}

// Our span should only be ended if the
// transaction either errored or was ended.
transaction!.once('error', err => {
setSpanError(span, err!);
span.end();
});

transaction!.once('end', err => {
setSpanError(span, err!);
span.end();
});

const release = () => {
this.pool_.release(session!);
};

const runner = new TransactionRunner(
session!,
transaction!,
async (err, resp) => {
if (err) {
setSpanError(span, err!);
}
// It is paramount that we await
// the caller to return before
// exiting this function otherwise the span
// order will not be correct.
await runFn!(err, resp);
span.end();
},
runFn!,
options
);

Expand All @@ -3275,13 +3291,12 @@ class Database extends common.GrpcServiceObject {
'session.id': session?.id,
});
release();
await this.runTransaction(options, runFn!);
retry(span);
} else {
setImmediate(runFn!, err);
release();
span.end();
}

span.end();
});
});
});
Expand Down

0 comments on commit 7bdc173

Please sign in to comment.