From 62bf90575577173c22e339e190843e1cd7312185 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 12 Oct 2024 02:24:12 -0700 Subject: [PATCH] feat(observability): fix bugs found from product review + negative cases This change adds recording of retry span annotations, catching cases in which exceptions where thrown but spans were not ended while testing out and visually confirming the results. --- src/database.ts | 117 ++++++++++++++++++++++---------------- src/instrument.ts | 36 ++++++++++-- src/transaction-runner.ts | 34 +++++++++-- src/transaction.ts | 37 ++++++------ 4 files changed, 146 insertions(+), 78 deletions(-) diff --git a/src/database.ts b/src/database.ts index 9a2b703a0..7169d3988 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2815,6 +2815,10 @@ class Database extends common.GrpcServiceObject { this.runStream(query, options) .on('error', err => { setSpanError(span, err); + console.log( + `\x1b[34mDatabase.run.error: ${err} isRecording: ${span.isRecording()}\x1b[00m` + ); + span.end(); callback!(err as grpc.ServiceError, rows, stats, metadata); }) .on('response', response => { @@ -3060,7 +3064,7 @@ class Database extends common.GrpcServiceObject { dataStream .once('data', () => (dataReceived = true)) .once('error', err => { - setSpanError(span, err); + setSpanErrorAndException(span, err as Error); if ( !dataReceived && @@ -3222,8 +3226,8 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - this.runTransaction(options, runFn!); span.end(); + this.runTransaction(options, runFn!); return; } @@ -3242,8 +3246,8 @@ class Database extends common.GrpcServiceObject { } const release = () => { - span.end(); this.pool_.release(session!); + span.end(); }; const runner = new TransactionRunner( @@ -3253,28 +3257,34 @@ class Database extends common.GrpcServiceObject { if (err) { setSpanError(span, err!); } - span.end(); runFn!(err, resp); }, options ); runner.run().then(release, err => { - if (err) { - setSpanError(span, err!); - } + setSpanError(span, err); if (isSessionNotFoundError(err)) { span.addEvent('No session available', { 'session.id': session?.id, }); + span.addEvent('Retrying'); release(); - this.runTransaction(options, runFn!); + this.runTransaction( + options, + ( + err: ServiceError | null, + txn: Transaction | null | undefined + ) => { + runFn!(err, txn); + } + ); } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } - setImmediate(runFn!, err); + span.addEvent('Using Session', {'session.id': session!.id}); + setImmediate((err: null | ServiceError) => { + runFn!(err); + }, err); release(); } }); @@ -3363,46 +3373,55 @@ class Database extends common.GrpcServiceObject { let sessionId = ''; const getSession = this.pool_.getSession.bind(this.pool_); - const span = getActiveOrNoopSpan(); - // Loop to retry 'Session not found' errors. - // (and yes, we like while (true) more than for (;;) here) - // eslint-disable-next-line no-constant-condition - while (true) { - try { - const [session, transaction] = await promisify(getSession)(); - transaction.requestOptions = Object.assign( - transaction.requestOptions || {}, - options.requestOptions - ); - if (options.optimisticLock) { - transaction.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction.excludeTxnFromChangeStreams(); - } - sessionId = session?.id; - span.addEvent('Using Session', {'session.id': sessionId}); - const runner = new AsyncTransactionRunner( - session, - transaction, - runFn, - options - ); - try { - return await runner.run(); - } finally { - this.pool_.release(session); - } - } catch (e) { - if (!isSessionNotFoundError(e as ServiceError)) { - span.addEvent('No session available', { - 'session.id': sessionId, - }); - throw e; + return startTrace( + 'Database.runTransactionAsync', + this._traceConfig, + span => { + // Loop to retry 'Session not found' errors. + // (and yes, we like while (true) more than for (;;) here) + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const [session, transaction] = await promisify(getSession)(); + transaction.requestOptions = Object.assign( + transaction.requestOptions || {}, + options.requestOptions + ); + if (options.optimisticLock) { + transaction.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction.excludeTxnFromChangeStreams(); + } + sessionId = session?.id; + span.addEvent('Using Session', {'session.id': sessionId}); + const runner = new AsyncTransactionRunner( + session, + transaction, + runFn, + options + ); + + try { + const result = await runner.run(); + span.end(); + return result; + } finally { + this.pool_.release(session); + } + } catch (e) { + if (!isSessionNotFoundError(e as ServiceError)) { + span.addEvent('No session available', { + 'session.id': sessionId, + }); + setSpanErrorAndException(span, e as Error); + throw e; + } + } } } - } + ); } /** diff --git a/src/instrument.ts b/src/instrument.ts index 99b260bf4..bebac663d 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -118,6 +118,8 @@ function ensureInitialContextManagerSet() { } } +const debugTraces = process.env.SPANNER_DEBUG_TRACES === 'true'; + /** * startTrace begins an active span in the current active context * and passes it back to the set callback function. Each span will @@ -142,6 +144,10 @@ export function startTrace( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, span => { + if (debugTraces) { + patchSpanEndForDebugging(span); + } + span.setAttribute(SEMATTRS_DB_SYSTEM, 'spanner'); span.setAttribute(ATTR_OTEL_SCOPE_NAME, TRACER_NAME); span.setAttribute(ATTR_OTEL_SCOPE_VERSION, TRACER_VERSION); @@ -165,11 +171,20 @@ export function startTrace( } } - if (config.that) { - const fn = cb.bind(config.that); - return fn(span); - } else { - return cb(span); + // If at all the invoked function throws an exception, + // record the exception and then end this span. + try { + if (config.that) { + const fn = cb.bind(config.that); + return fn(span); + } else { + return cb(span); + } + } catch (e) { + setSpanErrorAndException(span, e as Error); + span.end(); + // Finally re-throw the exception. + throw e; } } ); @@ -289,3 +304,14 @@ class noopSpan implements Span { return this; } } + +function patchSpanEndForDebugging(span: Span) { + const origSpanEnd = span.end; + const wrapSpanEnd = function (this: Span) { + console.trace(`\x1b[35m${spanNameSuffix}.end()\x1b[00m`); + return origSpanEnd.apply(this); + }; + Object.defineProperty(span, 'end', { + value: wrapSpanEnd, + }); +} diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 61d979e8c..521b3e862 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -26,6 +26,7 @@ import {isSessionNotFoundError} from './session-pool'; import {Database} from './database'; import {google} from '../protos/protos'; import IRequestOptions = google.spanner.v1.IRequestOptions; +import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const jsonProtos = require('../protos/protos.json'); @@ -224,6 +225,7 @@ export abstract class Runner { async run(): Promise { const start = Date.now(); const timeout = this.options.timeout!; + const span = getActiveOrNoopSpan(); let lastError: grpc.ServiceError; @@ -233,8 +235,18 @@ export abstract class Runner { const transaction = await this.getTransaction(); try { - return await this._run(transaction); + const result = await this._run(transaction); + if (this.attempts > 0) { + // No add to annotate if the transaction wasn't retried. + span.addEvent('Transaction Attempt Succeeded', { + attempt: this.attempts + 1, + }); + } + return result; } catch (e) { + span.addEvent('Transaction Attempt Failed', { + attempt: this.attempts + 1, + }); this.session.lastError = e as grpc.ServiceError; lastError = e as grpc.ServiceError; } @@ -243,19 +255,29 @@ export abstract class Runner { // thrown here. We do this to bubble this error up to the caller who is // responsible for retrying the transaction on a different session. if ( - !RETRYABLE.includes(lastError.code!) && - !isRetryableInternalError(lastError) + !RETRYABLE.includes(lastError!.code!) && + !isRetryableInternalError(lastError!) ) { - throw lastError; + span.addEvent('Transaction Attempt Aborted', { + attempt: this.attempts + 1, + }); + setSpanErrorAndException(span, lastError!); + throw lastError!; } this.attempts += 1; - const delay = this.getNextDelay(lastError); + const delay = this.getNextDelay(lastError!); + span.addEvent('Backing off', {delay: delay, attempt: this.attempts}); await new Promise(resolve => setTimeout(resolve, delay)); } - throw new DeadlineError(lastError!); + span.addEvent('Transaction Attempt Aborted due to Deadline Error', { + total_attempts: this.attempts + 1, + }); + const err = new DeadlineError(lastError!); + setSpanErrorAndException(span, err); + throw err; } } diff --git a/src/transaction.ts b/src/transaction.ts index e7993e74c..d646a1c89 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -757,12 +757,6 @@ export class Snapshot extends EventEmitter { this.begin(); } setSpanError(span, err); - }) - .on('end', err => { - if (err) { - setSpanError(span, err); - } - span.end(); }); if (resultStream instanceof Stream) { @@ -1288,7 +1282,6 @@ export class Snapshot extends EventEmitter { } catch (e) { const errorStream = new PassThrough(); setSpanErrorAndException(span, e as Error); - span.end(); setImmediate(() => errorStream.destroy(e as Error)); return errorStream; } @@ -1332,12 +1325,6 @@ export class Snapshot extends EventEmitter { ) { this.begin(); } - }) - .on('end', err => { - if (err) { - setSpanError(span, err as Error); - } - span.end(); }); if (resultStream instanceof Stream) { @@ -2112,15 +2099,29 @@ export class Transaction extends Dml { } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; } else { - this.begin().then(() => { - this.commit(options, (err, resp) => { - if (err) { + this.begin() + .then( + () => { + this.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + }, + err => { setSpanError(span, err); + callback(err); + span.end(); } + ) + .catch(err => { + setSpanErrorAndException(span, err as Error); span.end(); - callback(err, resp); + // Re-throw the exception after recording it. + throw err; }); - }, callback); return; }