From 1f5c03114dad8ea623e1cc8079a04be874370162 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 21 Dec 2024 06:02:23 -0800 Subject: [PATCH] feat(debugging): implement x-goog-spanner-request-id propagation per request Implements propagation of the x-goog-spanner-request-id that'll be propagated for every call. Once an error has been encountered, that error will have `.requestId` set. Fixes #2200 --- src/batch-transaction.ts | 29 ++++++++++---- src/database.ts | 5 +++ src/index.ts | 13 +++++++ src/request_id_header.ts | 80 +++++++++++++++++++++++++++++++++++++++ src/session.ts | 53 ++++++++++++++++++++++++-- src/transaction.ts | 25 ++++++++++-- test/request_id_header.ts | 35 +++++++++++++++++ 7 files changed, 226 insertions(+), 14 deletions(-) create mode 100644 src/request_id_header.ts create mode 100644 test/request_id_header.ts diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index d182d4429..03c63a4e2 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -146,6 +146,8 @@ class BatchTransaction extends Snapshot { 'BatchTransaction.createQueryPartitions', traceConfig, span => { + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const headers: {[k: string]: string} = {}; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -157,7 +159,11 @@ class BatchTransaction extends Snapshot { method: 'partitionQuery', reqOpts, gaxOpts: query.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + 1, + headers + ), }, (err, partitions, resp) => { if (err) { @@ -201,11 +207,16 @@ class BatchTransaction extends Snapshot { transaction: {id: this.id}, }); config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) - .formattedName_, + const database = this.session.parent as Database; + const headers = { + [CLOUD_RESOURCE_HEADER]: database.formattedName_, }; - delete query.partitionOptions; + (config.headers = this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + )), + delete query.partitionOptions; this.session.request(config, (err, resp) => { if (err) { setSpanError(span, err); @@ -286,14 +297,18 @@ class BatchTransaction extends Snapshot { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } - + const database = this.session.parent as Database; this.createPartitions_( { client: 'SpannerClient', method: 'partitionRead', reqOpts, gaxOpts: options.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, partitions, resp) => { if (err) { diff --git a/src/database.ts b/src/database.ts index e8b91480f..12ec102e6 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2917,6 +2917,11 @@ class Database extends common.GrpcServiceObject { }); } + public _nextNthRequest(): number { + const spanner = this.parent as Spanner; + return spanner._nextNthRequest(); + } + /** * Create a readable object stream to receive resulting rows from a SQL * statement. diff --git a/src/index.ts b/src/index.ts index 0bbdccd01..719ba8675 100644 --- a/src/index.ts +++ b/src/index.ts @@ -84,6 +84,11 @@ import { ObservabilityOptions, ensureInitialContextManagerSet, } from './instrument'; +import { + AtomicCounter, + newAtomicCounter, + nextSpannerClientId, +} from './request_id_header'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -246,6 +251,8 @@ class Spanner extends GrpcService { routeToLeaderEnabled = true; directedReadOptions: google.spanner.v1.IDirectedReadOptions | null; _observabilityOptions: ObservabilityOptions | undefined; + private _nthClientId: number; + private _nthRequest: AtomicCounter; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -374,6 +381,12 @@ class Spanner extends GrpcService { this.directedReadOptions = directedReadOptions; this._observabilityOptions = options.observabilityOptions; ensureInitialContextManagerSet(); + this._nthClientId = nextSpannerClientId(); + this._nthRequest = newAtomicCounter(0); + } + + _nextNthRequest(): number { + return this._nthRequest.increment(); } /** diff --git a/src/request_id_header.ts b/src/request_id_header.ts new file mode 100644 index 000000000..45491dbf4 --- /dev/null +++ b/src/request_id_header.ts @@ -0,0 +1,80 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {randomBytes} from 'crypto'; +const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString(); +const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id'; + +class AtomicCounter { + private backingBuffer: BigInt64Array; + + constructor(initialValue?: number) { + this.backingBuffer = new BigInt64Array( + new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT) + ); + if (initialValue) { + this.increment(initialValue); + } + } + + public increment(n?: number): number { + if (!n) { + n = 1; + } + Atomics.store(this.backingBuffer, 0, BigInt(n)); + return this.value(); + } + + public value(): number { + return Number(Atomics.load(this.backingBuffer, 0)); + } + + public toString(): string { + return `${this.value()}`; + } +} + +function craftRequestId( + nthClientId: number, + channelId: number, + nthRequest: number, + attempt: number +) { + return `1.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`; +} + +const nthClientId = new AtomicCounter(); + +/* + * nextSpannerClientId increments the internal + * counter for created SpannerClients, for use + * with x-goog-spanner-request-id. + */ +function nextSpannerClientId(): number { + return nthClientId.increment(1); +} + +function newAtomicCounter(n?: number): AtomicCounter { + return new AtomicCounter(n); +} + +export { + AtomicCounter, + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, + nextSpannerClientId, + newAtomicCounter, +}; diff --git a/src/session.ts b/src/session.ts index 2ece6b45d..7783b1d3a 100644 --- a/src/session.ts +++ b/src/session.ts @@ -44,7 +44,10 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; - +import { + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, +} from './request_id_header'; export type GetSessionResponse = [Session, r.Response]; /** @@ -316,13 +319,18 @@ export class Session extends common.GrpcServiceObject { const reqOpts = { name: this.formattedName_, }; + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'deleteSession', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: this._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.resourceHeader_ + ), }, callback! ); @@ -388,13 +396,18 @@ export class Session extends common.GrpcServiceObject { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'getSession', reqOpts, gaxOpts, - headers: headers, + headers: this._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, resp) => { if (resp) { @@ -440,13 +453,19 @@ export class Session extends common.GrpcServiceObject { session: this.formattedName_, sql: 'SELECT 1', }; + + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'executeSql', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: this._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.resourceHeader_ + ), }, callback! ); @@ -533,6 +552,32 @@ export class Session extends common.GrpcServiceObject { private _getSpanner(): Spanner { return this.parent.parent.parent as Spanner; } + + private channelId(): number { + // TODO: Infer channelId from the actual gRPC channel. + return 1; + } + + public _metadataWithRequestId( + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} + ): {[k: string]: string} { + const database = this.parent as Database; + if (!priorMetadata) { + priorMetadata = {}; + } + const withReqId = { + ...priorMetadata, + }; + withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( + database._nthClientId, + this.channelId(), + nthRequest, + attempt + ); + return withReqId; + } } /*! Developer Documentation diff --git a/src/transaction.ts b/src/transaction.ts index fa1f10814..2cc7962c9 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -445,6 +445,7 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + const database = this.session.parent as Database; return startTrace('Snapshot.begin', traceConfig, span => { span.addEvent('Begin Transaction'); @@ -454,7 +455,11 @@ export class Snapshot extends EventEmitter { method: 'beginTransaction', reqOpts, gaxOpts, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, ( err: null | grpc.ServiceError, @@ -711,8 +716,12 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + return startTrace('Snapshot.createReadStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); + const makeRequest = (resumeToken?: ResumeToken): Readable => { if (this.id && transaction.begin) { delete transaction.begin; @@ -739,7 +748,11 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; @@ -1297,6 +1310,8 @@ export class Snapshot extends EventEmitter { }; return startTrace('Snapshot.runStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const makeRequest = (resumeToken?: ResumeToken): Readable => { attempt++; @@ -1330,7 +1345,11 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; diff --git a/test/request_id_header.ts b/test/request_id_header.ts new file mode 100644 index 000000000..200c3c5c6 --- /dev/null +++ b/test/request_id_header.ts @@ -0,0 +1,35 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable prefer-rest-params */ +import { + AtomicCounter, +} from '../src/request_id_header'; + +describe("AtomicCounter", () => { + it("Constructor with initialValue", done => { + const ac0 = new AtomicCounter(); + assert.strictEqual(ac0.value(), 0); + assert.strictEqual(ac0.increment(2), 2, "increment should return the added value") + assert.strictEqual(ac0.value(), 2, "increment should have modified the value"); + + const ac1 = new AtomicCounter(1); + assert.strictEqual(ac1.value(), 1); + assert.strictEqual(ac0.increment(1<<32), (1<<32)+1, "increment should return the added value") + assert.strictEqual(ac0.value(), (1<<32)+1, "increment should have modified the value"); + done(); + }); +});