From b6969a9f9e07229fa326af0e12b3be3258f5481f Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 21 Dec 2024 06:02:23 -0800 Subject: [PATCH 01/12] feat(debugging): implement x-goog-spanner-request-id propagation per request Implements propagation of the x-goog-spanner-request-id that'll be propagated for every call. Once an error has been encountered, that error will have `.requestId` set. Fixes #2200 --- src/batch-transaction.ts | 29 ++++++++++---- src/database.ts | 47 ++++++++++++++++++++++- src/index.ts | 3 ++ src/instance.ts | 1 + src/request_id_header.ts | 81 +++++++++++++++++++++++++++++++++++++++ src/session.ts | 38 ++++++++++++++++-- src/transaction.ts | 25 ++++++++++-- test/gapic_spanner_v1.ts | 8 +++- test/request_id_header.ts | 49 +++++++++++++++++++++++ 9 files changed, 264 insertions(+), 17 deletions(-) create mode 100644 src/request_id_header.ts create mode 100644 test/request_id_header.ts diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index d182d4429..03c63a4e2 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -146,6 +146,8 @@ class BatchTransaction extends Snapshot { 'BatchTransaction.createQueryPartitions', traceConfig, span => { + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const headers: {[k: string]: string} = {}; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -157,7 +159,11 @@ class BatchTransaction extends Snapshot { method: 'partitionQuery', reqOpts, gaxOpts: query.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + 1, + headers + ), }, (err, partitions, resp) => { if (err) { @@ -201,11 +207,16 @@ class BatchTransaction extends Snapshot { transaction: {id: this.id}, }); config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) - .formattedName_, + const database = this.session.parent as Database; + const headers = { + [CLOUD_RESOURCE_HEADER]: database.formattedName_, }; - delete query.partitionOptions; + (config.headers = this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + )), + delete query.partitionOptions; this.session.request(config, (err, resp) => { if (err) { setSpanError(span, err); @@ -286,14 +297,18 @@ class BatchTransaction extends Snapshot { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } - + const database = this.session.parent as Database; this.createPartitions_( { client: 'SpannerClient', method: 'partitionRead', reqOpts, gaxOpts: options.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, partitions, resp) => { if (err) { diff --git a/src/database.ts b/src/database.ts index edccf23c1..9f2f2ce9c 100644 --- a/src/database.ts +++ b/src/database.ts @@ -112,6 +112,13 @@ import { setSpanErrorAndException, traceConfig, } from './instrument'; +import { + AtomicCounter, + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, + newAtomicCounter, +} from './request_id_header'; + export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse @@ -350,6 +357,8 @@ class Database extends common.GrpcServiceObject { > | null; _observabilityOptions?: ObservabilityOptions; // TODO: exmaine if we can remove it private _traceConfig: traceConfig; + private _nthRequest: AtomicCounter; + public _clientId: number; constructor( instance: Instance, name: string, @@ -483,7 +492,14 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); + this._nthRequest = newAtomicCounter(0); + this._clientId = 0; + } + + _nextNthRequest(): number { + return this._nthRequest.increment(); } + /** * @typedef {array} SetDatabaseMetadataResponse * @property {object} 0 The {@link Database} metadata. @@ -699,7 +715,11 @@ class Database extends common.GrpcServiceObject { method: 'batchCreateSessions', reqOpts, gaxOpts: options.gaxOptions, - headers: headers, + headers: this._metadataWithRequestId( + this._nextNthRequest(), + 1, + headers + ), }, (err, resp) => { if (err) { @@ -723,6 +743,31 @@ class Database extends common.GrpcServiceObject { }); } + private channelId(): number { + // TODO: Infer channelId from the actual gRPC channel. + return 1; + } + + public _metadataWithRequestId( + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} + ): {[k: string]: string} { + if (!priorMetadata) { + priorMetadata = {}; + } + const withReqId = { + ...priorMetadata, + }; + withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( + this._clientId, + this.channelId(), + nthRequest, + attempt + ); + return withReqId; + } + /** * Get a reference to a {@link BatchTransaction} object. * diff --git a/src/index.ts b/src/index.ts index f0f88a801..5360f596d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -87,6 +87,7 @@ import { ensureInitialContextManagerSet, ensureContextPropagation, } from './instrument'; +import {AtomicCounter, nextSpannerClientId} from './request_id_header'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -249,6 +250,7 @@ class Spanner extends GrpcService { routeToLeaderEnabled = true; directedReadOptions: google.spanner.v1.IDirectedReadOptions | null; _observabilityOptions: ObservabilityOptions | undefined; + _nthClientId: number; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -379,6 +381,7 @@ class Spanner extends GrpcService { ); ensureInitialContextManagerSet(); ensureContextPropagation(); + this._nthClientId = nextSpannerClientId(); } /** diff --git a/src/instance.ts b/src/instance.ts index 72257b24c..e15c94c5c 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -980,6 +980,7 @@ class Instance extends common.GrpcServiceObject { if (!this.databases_.has(key!)) { const db = new Database(this, name, poolOptions, queryOptions); db._observabilityOptions = this._observabilityOptions; + db._clientId = (this.parent as Spanner)._nthClientId; this.databases_.set(key!, db); } return this.databases_.get(key!)!; diff --git a/src/request_id_header.ts b/src/request_id_header.ts new file mode 100644 index 000000000..5f660baf8 --- /dev/null +++ b/src/request_id_header.ts @@ -0,0 +1,81 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {randomBytes} from 'crypto'; +const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString(); +const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id'; + +class AtomicCounter { + private backingBuffer: Uint32Array; + + constructor(initialValue?: number) { + this.backingBuffer = new Uint32Array( + new SharedArrayBuffer(Uint32Array.BYTES_PER_ELEMENT) + ); + if (initialValue) { + this.increment(initialValue); + } + } + + public increment(n?: number): number { + if (!n) { + n = 1; + } + Atomics.add(this.backingBuffer, 0, n); + return this.value(); + } + + public value(): number { + return Atomics.load(this.backingBuffer, 0); + } + + public toString(): string { + return `${this.value()}`; + } +} + +function craftRequestId( + nthClientId: number, + channelId: number, + nthRequest: number, + attempt: number +) { + return `1.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`; +} + +const nthClientId = new AtomicCounter(); + +/* + * nextSpannerClientId increments the internal + * counter for created SpannerClients, for use + * with x-goog-spanner-request-id. + */ +function nextSpannerClientId(): number { + nthClientId.increment(1); + return nthClientId.value(); +} + +function newAtomicCounter(n?: number): AtomicCounter { + return new AtomicCounter(n); +} + +export { + AtomicCounter, + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, + nextSpannerClientId, + newAtomicCounter, +}; diff --git a/src/session.ts b/src/session.ts index 32d79d352..d8317693e 100644 --- a/src/session.ts +++ b/src/session.ts @@ -44,7 +44,6 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; - export type GetSessionResponse = [Session, r.Response]; /** @@ -317,13 +316,18 @@ export class Session extends common.GrpcServiceObject { const reqOpts = { name: this.formattedName_, }; + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'deleteSession', reqOpts, gaxOpts, - headers: this.commonHeaders_, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.commonHeaders_ + ), }, callback! ); @@ -389,13 +393,18 @@ export class Session extends common.GrpcServiceObject { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'getSession', reqOpts, gaxOpts, - headers: headers, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, resp) => { if (resp) { @@ -441,17 +450,33 @@ export class Session extends common.GrpcServiceObject { session: this.formattedName_, sql: 'SELECT 1', }; + + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'executeSql', reqOpts, gaxOpts, - headers: this.commonHeaders_, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.commonHeaders_ + ), }, callback! ); } + + public _metadataWithRequestId( + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} + ): {[k: string]: string} { + const database = this.parent as Database; + return database._metadataWithRequestId(nthRequest, attempt, priorMetadata); + } + /** * Create a PartitionedDml transaction. * @@ -534,6 +559,11 @@ export class Session extends common.GrpcServiceObject { private _getSpanner(): Spanner { return this.parent.parent.parent as Spanner; } + + private channelId(): number { + // TODO: Infer channelId from the actual gRPC channel. + return 1; + } } /*! Developer Documentation diff --git a/src/transaction.ts b/src/transaction.ts index 725ec3235..494191c98 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -446,6 +446,7 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + const database = this.session.parent as Database; return startTrace('Snapshot.begin', traceConfig, span => { span.addEvent('Begin Transaction'); @@ -455,7 +456,11 @@ export class Snapshot extends EventEmitter { method: 'beginTransaction', reqOpts, gaxOpts, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, ( err: null | grpc.ServiceError, @@ -712,8 +717,12 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + return startTrace('Snapshot.createReadStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); + const makeRequest = (resumeToken?: ResumeToken): Readable => { if (this.id && transaction.begin) { delete transaction.begin; @@ -740,7 +749,11 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; @@ -1298,6 +1311,8 @@ export class Snapshot extends EventEmitter { }; return startTrace('Snapshot.runStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const makeRequest = (resumeToken?: ResumeToken): Readable => { attempt++; @@ -1331,7 +1346,11 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; diff --git a/test/gapic_spanner_v1.ts b/test/gapic_spanner_v1.ts index b98138a1a..d2220d051 100644 --- a/test/gapic_spanner_v1.ts +++ b/test/gapic_spanner_v1.ts @@ -481,10 +481,14 @@ describe('v1.SpannerClient', () => { client.innerApiCalls.batchCreateSessions as SinonStub ).getCall(0).args[0]; assert.deepStrictEqual(actualRequest, request); - const actualHeaderRequestParams = ( + const actualHeaders = ( client.innerApiCalls.batchCreateSessions as SinonStub - ).getCall(0).args[1].otherArgs.headers['x-goog-request-params']; + ).getCall(0).args[1].otherArgs.headers; + const actualHeaderRequestParams = actualHeaders['x-goog-request-params']; assert(actualHeaderRequestParams.includes(expectedHeaderRequestParams)); + const actualRequestID = actualHeaders['x-goog-spanner-request-id']; + console.log('headers', actualHeaders); + assert.deepStrictEqual(actualRequestID, 'foo'); }); it('invokes batchCreateSessions without error using callback', async () => { diff --git a/test/request_id_header.ts b/test/request_id_header.ts new file mode 100644 index 000000000..31d28621e --- /dev/null +++ b/test/request_id_header.ts @@ -0,0 +1,49 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable prefer-rest-params */ +import {AtomicCounter} from '../src/request_id_header'; + +describe('AtomicCounter', () => { + it('Constructor with initialValue', done => { + const ac0 = new AtomicCounter(); + assert.strictEqual(ac0.value(), 0); + assert.strictEqual( + ac0.increment(2), + 2, + 'increment should return the added value' + ); + assert.strictEqual( + ac0.value(), + 2, + 'increment should have modified the value' + ); + + const ac1 = new AtomicCounter(1); + assert.strictEqual(ac1.value(), 1); + assert.strictEqual( + ac0.increment(1 << 32), + (1 << 32) + 1, + 'increment should return the added value' + ); + assert.strictEqual( + ac0.value(), + (1 << 32) + 1, + 'increment should have modified the value' + ); + done(); + }); +}); From f15c1fc3abe0196b61089a54bb406b34ab39261e Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 21 Dec 2024 09:08:22 -0800 Subject: [PATCH 02/12] Spring up checks with MockSpanner --- src/database.ts | 23 ++++++++++------------- test/spanner.ts | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/src/database.ts b/src/database.ts index 9f2f2ce9c..b230f7052 100644 --- a/src/database.ts +++ b/src/database.ts @@ -473,6 +473,8 @@ class Database extends common.GrpcServiceObject { }; this.request = instance.request; + this._nthRequest = newAtomicCounter(0); + this._clientId = (this.parent.parent as Spanner)._nthClientId; this._observabilityOptions = instance._observabilityOptions; this.commonHeaders_ = getCommonHeaders( this.formattedName_, @@ -492,8 +494,6 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); - this._nthRequest = newAtomicCounter(0); - this._clientId = 0; } _nextNthRequest(): number { @@ -708,6 +708,12 @@ class Database extends common.GrpcServiceObject { addLeaderAwareRoutingHeader(headers); } + const allHeaders = this._metadataWithRequestId( + this._nextNthRequest(), + 1, + headers + ); + startTrace('Database.batchCreateSessions', this._traceConfig, span => { this.request( { @@ -715,11 +721,7 @@ class Database extends common.GrpcServiceObject { method: 'batchCreateSessions', reqOpts, gaxOpts: options.gaxOptions, - headers: this._metadataWithRequestId( - this._nextNthRequest(), - 1, - headers - ), + headers: allHeaders, }, (err, resp) => { if (err) { @@ -743,11 +745,6 @@ class Database extends common.GrpcServiceObject { }); } - private channelId(): number { - // TODO: Infer channelId from the actual gRPC channel. - return 1; - } - public _metadataWithRequestId( nthRequest: number, attempt: number, @@ -761,7 +758,7 @@ class Database extends common.GrpcServiceObject { }; withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( this._clientId, - this.channelId(), + 1, // TODO: Properly infer the channelId nthRequest, attempt ); diff --git a/test/spanner.ts b/test/spanner.ts index b6ee01024..108888e72 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -26,6 +26,7 @@ import { Spanner, Transaction, } from '../src'; +import {MetadataValue} from '@grpc/grpc-js'; import * as mock from './mockserver/mockspanner'; import { MockError, @@ -5093,6 +5094,42 @@ describe('Spanner with mock server', () => { }); }); }); + + describe("XGoogRequestId", () => { + it('with retry on aborted query', async() => { + let attempts = 0; + const database = newTestDatabase(); + let rowCount = 0; + await database.runTransactionAsync(async (transaction) => { + if (!attempts) { + spannerMock.abortTransaction(transaction!); + } + attempts++; + const [rows] = await transaction!.run(selectSql); + rows.forEach(() => rowCount++); + assert.strictEqual(rowCount, 3); + assert.strictEqual(attempts, 2); + await transaction!.commit(); + }); + + const sentMetadata = spannerMock.getMetadata(); + const sentRequests = spannerMock.getRequests(); + var xGoogRequestHeaders: MetadataValue[] = []; + for (const index in sentMetadata) { + const req = sentRequests[index]; + console.log(index, "req", req.constructor.name); + } + + for (const md of sentMetadata) { + const got = md.get("x-goog-spanner-request-id"); + if (got) { + xGoogRequestHeaders.push(...got); + } + } + console.log("xGoogHeaders", xGoogRequestHeaders!); + await database.close(); + }); + }); }); function executeSimpleUpdate( From af1ad20e42bc51905d33c43792425e4b7099e51d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 22 Dec 2024 02:52:37 -0800 Subject: [PATCH 03/12] Plumb grpc.Server interceptors to assert for each call to aid in trivial elimination --- src/.instrumentation-spanner/index.ts | 16 ++ .../instrumentation.ts | 204 ++++++++++++++++++ src/.instrumentation-spanner/old-index_ts | 99 +++++++++ src/.instrumentation-spanner/types.ts | 25 +++ src/.instrumentation-spanner/version.ts | 18 ++ src/database.ts | 6 +- src/index.ts | 15 ++ src/request_id_header.ts | 86 ++++++++ test/common/helper.ts | 44 ++++ test/spanner.ts | 23 +- 10 files changed, 524 insertions(+), 12 deletions(-) create mode 100644 src/.instrumentation-spanner/index.ts create mode 100644 src/.instrumentation-spanner/instrumentation.ts create mode 100644 src/.instrumentation-spanner/old-index_ts create mode 100644 src/.instrumentation-spanner/types.ts create mode 100644 src/.instrumentation-spanner/version.ts create mode 100644 test/common/helper.ts diff --git a/src/.instrumentation-spanner/index.ts b/src/.instrumentation-spanner/index.ts new file mode 100644 index 000000000..3efe082da --- /dev/null +++ b/src/.instrumentation-spanner/index.ts @@ -0,0 +1,16 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export * from './instrumentation'; +export * from './types'; diff --git a/src/.instrumentation-spanner/instrumentation.ts b/src/.instrumentation-spanner/instrumentation.ts new file mode 100644 index 000000000..3ca936bc9 --- /dev/null +++ b/src/.instrumentation-spanner/instrumentation.ts @@ -0,0 +1,204 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + isWrapped, +} from '@opentelemetry/instrumentation'; +import { + SEMATTRS_DB_SQL_TABLE, + SEMATTRS_DB_STATEMENT, + SEMATTRS_DB_SYSTEM, +} from '@opentelemetry/semantic-conventions'; +import { + Span, + SpanStatusCode, + Context, + context, + trace, + SpanKind, +} from '@opentelemetry/api'; +import {PACKAGE_NAME, PACKAGE_VERSION} from './version'; +import type * as spannerTypes from '..'; +import {SpannerInstrumentationConfig} from './types'; + +const optedInPII: boolean = + process.env.SPANNER_ENABLE_EXTENDED_TRACING === 'true'; + +interface SQLStatement { + sql: string; +} + +const {DiagConsoleLogger, DiagLogLevel, diag} = require('@opentelemetry/api'); +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.ALL); + +export class SpannerInstrumentation extends InstrumentationBase { + static readonly COMMON_ATTRIBUTES = { + SEMATTRS_DB_SYSTEM: 'spanner', + }; + + constructor(config: SpannerInstrumentationConfig = {}) { + super(PACKAGE_NAME, PACKAGE_VERSION, config); + } + + protected init() { + console.log('SpannerInstrumentation.init invoked'); + + return [ + new InstrumentationNodeModuleDefinition( + '@google-cloud/spanner', + ['*'], + (moduleExports: typeof spannerTypes) => { + console.log('invoking wrapping'); + if (isWrapped(moduleExports.Instance.database)) { + this._unwrap(moduleExports, 'Instance'); + } + this._wrap( + moduleExports, + 'Instance', + this._patchCreateDatabase() as any + ); + } + ), + ]; + } + + private _patchCreateDatabase() { + console.log('_patchCreateDatabase'); + return (originalCreateDatabase: Function) => { + console.log('wrapping for patchCreateDatabase'); + const plugin = this; + + return function createDatabase() { + const db = originalCreateDatabase(...arguments); + console.log('createDatabase'); + + plugin._wrap(db, 'run', plugin._patchDatabaseRun(db) as any); + + return db; + }; + }; + } + + private _patchDatabaseRun(db: typeof spannerTypes.Database) { + return (originalDatabaseRun: Function) => { + const plugin = this; + + return function databaseRun() { + if (!plugin['_enabled']) { + plugin._unwrap(db, 'run'); + return originalDatabaseRun.apply(db, arguments); + } + + const query = arguments[0] || ''; + const cbIndex = Array.from(arguments).findIndex( + arg => typeof arg === 'function' + ); + const span = plugin.startTrace('Database.run', {sql: query}); + + const parentContext = context.active(); + if (cbIndex === -1) { + // We've got a promise not a callback. + const streamableQuery = context.with( + trace.setSpan(context.active(), span), + () => { + return originalDatabaseRun.apply(db, arguments); + } + ); + context.bind(parentContext, streamableQuery); + + return streamableQuery + .on('error', err => { + setSpanError(span, err); + }) + .on('end', () => { + span.end(); + }); + } + + // Here we've got a callback hence can wrap it differently. + plugin._wrap( + arguments, + cbIndex, + plugin._patchCallbackRun(span, parentContext) + ); + + return context.with(trace.setSpan(context.active(), span), () => { + return originalDatabaseRun.apply(db, arguments); + }); + }; + }; + } + + private _patchCallbackRun(span: Span, parentContext: Context) { + return (originalCallback: Function) => { + return (err: Error, rows: any) => { + setSpanError(span, err); + span.end(); + return context.with(parentContext, () => { + originalCallback(...arguments); + }); + }; + }; + } + + private startTrace( + spanNameSuffix: string, + opts: {tableName?: string; sql?: string | SQLStatement} + ): Span { + const span = this.tracer.startSpan( + 'cloud.google.com/nodejs/spanner/' + spanNameSuffix, + {kind: SpanKind.CLIENT} + ); + + if (opts.tableName) { + span.setAttribute(SEMATTRS_DB_SQL_TABLE, opts.tableName); + } + + const definedExtendedTracing = + this._config.enableExtendedTracing !== undefined; + // If they optedInPII but opts.enableExtendedTracing=false, reject it. + const explicitlySkipET = + definedExtendedTracing && !this._config.enableExtendedTracing; + if ( + opts.sql && + !explicitlySkipET && + (this._config.enableExtendedTracing || optedInPII) + ) { + const sql = opts.sql; + if (typeof sql === 'string') { + span.setAttribute(SEMATTRS_DB_STATEMENT, sql as string); + } else { + const stmt = sql as SQLStatement; + span.setAttribute(SEMATTRS_DB_STATEMENT, stmt.sql); + } + } + + return span; + } +} + +// setSpanError sets err, if non-nil onto the span with +// status.code=ERROR and the message of err.toString() +function setSpanError(span: Span, err: Error | String) { + if (!err || !span) { + return; + } + + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.toString(), + }); +} diff --git a/src/.instrumentation-spanner/old-index_ts b/src/.instrumentation-spanner/old-index_ts new file mode 100644 index 000000000..c1cdfb51c --- /dev/null +++ b/src/.instrumentation-spanner/old-index_ts @@ -0,0 +1,99 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + InstrumentationBase, + InstrumentationConfig, + InstrumentationNodeModuleDefinition, + InstrumentationNodeModuleFile, +} from '@opentelemetry/instrumentation'; + +import { + context, + trace, + Span, +} from '@opentelemetry/api'; + +export class SpannerInstrumentation extends InstrumentationBase { + constructor(config: InstrumentationConfig = {}) { + super('@google-cloud/spanner', 'v1.0.0', config); + } + + /* + * The init method will be called when the plugin is constructed. + * It returns an IntrumentationNodeModuleDefinition which describes + * the node module to be instrumented and patched. + * It might also return a list of InstrumentationNodeModuleDefinitions if + * the plugin should patch multiple omodules or versions. + */ + protected init(): [InstrumentationNodeModuleDefinition] { + console.log('spanner-instrumentation invoked'); + + return [ + new InstrumentationNodeModuleDefinition( + '@google-cloud/spanner', + ['*'], + moduleExports => { + this._wrap( + moduleExports.Spanner.prototype, + }), + ]; + } + + private _onPatchMain(moduleExports: typeof Database) { + console.log('_onPatchMain'); + this._wrap( + moduleExports.Database.prototype, + 'run', + this._patchMainMethodName()); + return moduleExports; + } + + private _onUnPatchMain(moduleExports: typeof Database) { + this._unwrap(moduleExports, 'run'); + } + + private _onPatchMethodName(moduleExports: typeof Database) { + console.log('_onPatchMethodName'); + this._wrap( + moduleExports, + 'run', + this._patchMethodName()); + return moduleExports; + } + + private _onUnPatchMethodName(moduleExports: typeof Database) { + this._unwrap(moduleExports, 'run'); + } + + private _patchMethodName(): (original) => any { + const plugin = this; + return function methodName(original) { + return function patchMethodName(this: any): Promise { + console.log('methodName', arguments); + return original.apply(this, arguments); + } + } + } + + private _patchMainMethodName(): (original) => any { + const plugin = this; + return function mainMethodName(original) { + return function patchMainMethodName(this: any): Promise { + console.log('mainMethodName', arguments); + return original.apply(this, arguments); + } + } + } +} diff --git a/src/.instrumentation-spanner/types.ts b/src/.instrumentation-spanner/types.ts new file mode 100644 index 000000000..18098cb7e --- /dev/null +++ b/src/.instrumentation-spanner/types.ts @@ -0,0 +1,25 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {InstrumentationConfig} from '@opentelemetry/instrumentation'; + +export interface SpannerInstrumentationConfig extends InstrumentationConfig { + // enableExtendedTracing when set to true allows spans to be + // annotated with the SQL being executed, where applicable. + enableExtendedTracing?: boolean; +} + +export interface DatabaseKind { + run(): any; +} diff --git a/src/.instrumentation-spanner/version.ts b/src/.instrumentation-spanner/version.ts new file mode 100644 index 000000000..e3b95e7d9 --- /dev/null +++ b/src/.instrumentation-spanner/version.ts @@ -0,0 +1,18 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +const PACKAGE_NAME = 'spanner'; +const PACKAGE_VERSION = 'v1.0.0'; + +export {PACKAGE_NAME, PACKAGE_VERSION}; diff --git a/src/database.ts b/src/database.ts index b230f7052..006cff8ea 100644 --- a/src/database.ts +++ b/src/database.ts @@ -709,9 +709,9 @@ class Database extends common.GrpcServiceObject { } const allHeaders = this._metadataWithRequestId( - this._nextNthRequest(), - 1, - headers + this._nextNthRequest(), + 1, + headers ); startTrace('Database.batchCreateSessions', this._traceConfig, span => { diff --git a/src/index.ts b/src/index.ts index 5360f596d..01336e46a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -148,6 +148,8 @@ export interface SpannerOptions extends GrpcClientOptions { routeToLeaderEnabled?: boolean; directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null; observabilityOptions?: ObservabilityOptions; + unaryInterceptors?: any[]; + streamInterceptors?: any[]; } export interface RequestConfig { client: string; @@ -312,6 +314,14 @@ class Spanner extends GrpcService { } } } + + let unaryInterceptors: any[] = []; + let streamInterceptors: any[] = []; + if (options) { + unaryInterceptors = options.unaryInterceptors || []; + streamInterceptors = options.streamInterceptors || []; + } + options = Object.assign( { libName: 'gccl', @@ -324,6 +334,11 @@ class Spanner extends GrpcService { 'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer, 'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride, 'grpc.gcpApiConfig': grpcGcp.createGcpApiConfig(gcpApiConfig), + + // TODO: Negotiate with the Google team to plumb gRPC + // settings such as interceptors to the gRPC client. + 'grpc.unaryInterceptors': unaryInterceptors, + 'grpc.streamInterceptors': streamInterceptors, grpc, }, options || {} diff --git a/src/request_id_header.ts b/src/request_id_header.ts index 5f660baf8..c29b72430 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -15,6 +15,7 @@ */ import {randomBytes} from 'crypto'; +import * as grpc from '@grpc/grpc-js'; const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString(); const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id'; @@ -72,10 +73,95 @@ function newAtomicCounter(n?: number): AtomicCounter { return new AtomicCounter(n); } +const X_GOOG_REQ_ID_REGEX = /(\d+\.){5}\d+/; + +class XGoogRequestHeaderInterceptor { + private nStream: number; + private nUnary: number; + private streamCalls: any[]; + private unaryCalls: any[]; + constructor() { + this.nStream = 0; + this.streamCalls = []; + this.nUnary = 0; + this.unaryCalls = []; + } + + assertHasHeader(call): string | unknown { + const metadata = call.metadata; + const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER]; + if (!gotReqId) { + throw new Error( + `${call.method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header` + ); + } + + if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) { + throw new Error( + `${call.method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}` + ); + } + return gotReqId; + } + + interceptUnary(call, next) { + const gotReqId = this.assertHasHeader(call); + this.unaryCalls.push({method: call.method, reqId: gotReqId}); + this.nUnary++; + next(call); + } + + interceptStream(call, next) { + const gotReqId = this.assertHasHeader(call); + this.streamCalls.push({method: call.method, reqId: gotReqId}); + this.nStream++; + next(call); + } + + serverInterceptor(methodDescriptor, call) { + const method = call.handler.path; + const isUnary = call.handler.type === 'unary'; + const listener = new grpc.ServerListenerBuilder() + .withOnReceiveMetadata((metadata, next) => { + const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER]; + if (!gotReqId) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: `${method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`, + }); + return; + } + + if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: `${method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`, + }); + } + + // Otherwise it matched all good. + if (isUnary) { + this.unaryCalls.push({method: method, reqId: gotReqId}); + this.nUnary++; + } else { + this.streamCalls.push({method: method, reqId: gotReqId}); + this.nStream++; + } + }) + .build(); + + const responder = new grpc.ResponderBuilder() + .withStart(next => next(listener)) + .build(); + return new grpc.ServerInterceptingCall(call, responder); + } +} + export { AtomicCounter, X_GOOG_SPANNER_REQUEST_ID_HEADER, craftRequestId, nextSpannerClientId, newAtomicCounter, + XGoogRequestHeaderInterceptor, }; diff --git a/test/common/helper.ts b/test/common/helper.ts new file mode 100644 index 000000000..fee1d8336 --- /dev/null +++ b/test/common/helper.ts @@ -0,0 +1,44 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {EventEmitter} from 'events'; +import * as through from 'through2'; + +export class FakeTransaction extends EventEmitter { + _ended: boolean; + constructor() { + super(); + this._ended = false; + } + commit(gaxOptions, callback) { + this.end(); + callback(null, {}); + } + createReadStream() { + return through.obj(); + } + deleteRows() {} + end() { + if (!this._ended) { + this.emit('end'); + this._ended = true; + } + } + insert() {} + replace() {} + upsert() {} + update() {} +} diff --git a/test/spanner.ts b/test/spanner.ts index 108888e72..558c21a8a 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -26,7 +26,6 @@ import { Spanner, Transaction, } from '../src'; -import {MetadataValue} from '@grpc/grpc-js'; import * as mock from './mockserver/mockspanner'; import { MockError, @@ -56,6 +55,7 @@ import { CLOUD_RESOURCE_HEADER, LEADER_AWARE_ROUTING_HEADER, } from '../src/common'; +import {XGoogRequestHeaderInterceptor} from '../src/request_id_header'; import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata; import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions; import v1 = google.spanner.v1; @@ -103,7 +103,10 @@ describe('Spanner with mock server', () => { const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), { code: grpc.status.NOT_FOUND, }); - const server = new grpc.Server(); + const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor(); + const server = new grpc.Server({ + interceptors: [xGoogReqIDInterceptor.serverInterceptor], + }); const spannerMock = mock.createMockSpanner(server); mockInstanceAdmin.createMockInstanceAdmin(server); mockDatabaseAdmin.createMockDatabaseAdmin(server); @@ -168,6 +171,8 @@ describe('Spanner with mock server', () => { servicePath: 'localhost', port, sslCreds: grpc.credentials.createInsecure(), + streamInterceptors: [xGoogReqIDInterceptor.interceptStream], + unaryInterceptors: [xGoogReqIDInterceptor.interceptUnary], }); // Gets a reference to a Cloud Spanner instance and database instance = spanner.instance('instance'); @@ -5100,7 +5105,7 @@ describe('Spanner with mock server', () => { let attempts = 0; const database = newTestDatabase(); let rowCount = 0; - await database.runTransactionAsync(async (transaction) => { + await database.runTransactionAsync(async transaction => { if (!attempts) { spannerMock.abortTransaction(transaction!); } @@ -5114,19 +5119,19 @@ describe('Spanner with mock server', () => { const sentMetadata = spannerMock.getMetadata(); const sentRequests = spannerMock.getRequests(); - var xGoogRequestHeaders: MetadataValue[] = []; + const xGoogRequestHeaders = []; for (const index in sentMetadata) { - const req = sentRequests[index]; - console.log(index, "req", req.constructor.name); + const req = sentRequests[index]; + console.log(index, 'req', req.constructor.name); } for (const md of sentMetadata) { - const got = md.get("x-goog-spanner-request-id"); + const got = md.get('x-goog-spanner-request-id'); if (got) { - xGoogRequestHeaders.push(...got); + xGoogRequestHeaders.push(...got); } } - console.log("xGoogHeaders", xGoogRequestHeaders!); + console.log('xGoogHeaders', xGoogRequestHeaders!); await database.close(); }); }); From bca3513f8db257807d478a0c8d0a70448064d6f4 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 22 Dec 2024 04:33:14 -0800 Subject: [PATCH 04/12] Plumb tests with x-goog-spanner-request-id --- src/database.ts | 18 ++++++++++++-- src/request_id_header.ts | 52 +++++++++++++++++++++++++++++++++------- src/transaction.ts | 15 ++++++++++-- test/spanner.ts | 17 +++++++++---- 4 files changed, 86 insertions(+), 16 deletions(-) diff --git a/src/database.ts b/src/database.ts index 006cff8ea..1618315b3 100644 --- a/src/database.ts +++ b/src/database.ts @@ -474,7 +474,11 @@ class Database extends common.GrpcServiceObject { this.request = instance.request; this._nthRequest = newAtomicCounter(0); - this._clientId = (this.parent.parent as Spanner)._nthClientId; + if (this.parent && this.parent.parent) { + this._clientId = (this.parent.parent as Spanner)._nthClientId; + } else { + this._clientId = instance._nthClientId; + } this._observabilityOptions = instance._observabilityOptions; this.commonHeaders_ = getCommonHeaders( this.formattedName_, @@ -1030,7 +1034,11 @@ class Database extends common.GrpcServiceObject { reqOpts.session.creatorRole = options.databaseRole || this.databaseRole || null; - const headers = this.commonHeaders_; + const headers = this._metadataWithRequestId( + this._nextNthRequest(), + 1, + this.commonHeaders_, + ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } @@ -1951,6 +1959,12 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } + const headers = this._metadataWithRequestId( + this._nextNthRequest(), + 1, + this.resourceHeader_ + ); + return startTrace('Database.getSessions', this._traceConfig, span => { this.request< google.spanner.v1.ISession, diff --git a/src/request_id_header.ts b/src/request_id_header.ts index c29b72430..049ee52d8 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -80,11 +80,13 @@ class XGoogRequestHeaderInterceptor { private nUnary: number; private streamCalls: any[]; private unaryCalls: any[]; - constructor() { + private prefixesToIgnore?: string[]; + constructor(prefixesToIgnore?: string[]) { this.nStream = 0; this.streamCalls = []; this.nUnary = 0; this.unaryCalls = []; + this.prefixesToIgnore = prefixesToIgnore || []; } assertHasHeader(call): string | unknown { @@ -118,13 +120,36 @@ class XGoogRequestHeaderInterceptor { next(call); } + generateServerInterceptor() { + return this.serverInterceptor.bind(this); + } + + reset() { + this.nStream = 0; + this.streamCalls = []; + this.nUnary = 0; + this.unaryCalls = []; + } + serverInterceptor(methodDescriptor, call) { const method = call.handler.path; const isUnary = call.handler.type === 'unary'; + const that = this; const listener = new grpc.ServerListenerBuilder() .withOnReceiveMetadata((metadata, next) => { - const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER]; - if (!gotReqId) { + let i = 0; + const prefixesToIgnore: string[] = that.prefixesToIgnore || []; + for (i = 0; i < prefixesToIgnore.length; i++) { + const prefix = prefixesToIgnore[i]; + console.log(`prefix: ${prefix}\nmethod: ${method}`); + if (method.startsWith(prefix)) { + next(metadata); + return; + } + } + + const gotReqIds = metadata.get(X_GOOG_SPANNER_REQUEST_ID_HEADER); + if (!(gotReqIds && gotReqIds.length > 0)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: `${method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`, @@ -132,21 +157,32 @@ class XGoogRequestHeaderInterceptor { return; } + if (gotReqIds.length !== 1) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: `${method} set multiple ${X_GOOG_SPANNER_REQUEST_ID_HEADER} headers: ${gotReqIds}`, + }); + return; + } + + const gotReqId = gotReqIds[0].toString(); if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: `${method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`, }); + return; } - // Otherwise it matched all good. if (isUnary) { - this.unaryCalls.push({method: method, reqId: gotReqId}); - this.nUnary++; + that.unaryCalls.push({method: method, reqId: gotReqId}); + that.nUnary++; } else { - this.streamCalls.push({method: method, reqId: gotReqId}); - this.nStream++; + that.streamCalls.push({method: method, reqId: gotReqId}); + that.nStream++; } + + next(metadata); }) .build(); diff --git a/src/transaction.ts b/src/transaction.ts index 494191c98..d2e6a6229 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1976,7 +1976,13 @@ export class Transaction extends Dml { statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; - const headers = this.commonHeaders_; + const database = this.session.parent as Database; + const headers = this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.commonHeaders_, + this.resourceHeader_ + ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } @@ -2209,13 +2215,18 @@ export class Transaction extends Dml { span.addEvent('Starting Commit'); + const database = this.session.parent as Database; this.request( { client: 'SpannerClient', method: 'commit', reqOpts, gaxOpts: gaxOpts, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { this.end(); diff --git a/test/spanner.ts b/test/spanner.ts index 558c21a8a..65e6cc428 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -103,9 +103,12 @@ describe('Spanner with mock server', () => { const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), { code: grpc.status.NOT_FOUND, }); - const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor(); + const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor([ + '/google.spanner.admin', + '/google.spanner.admin.database.v1.DatabaseAdmin', + ]); const server = new grpc.Server({ - interceptors: [xGoogReqIDInterceptor.serverInterceptor], + interceptors: [xGoogReqIDInterceptor.generateServerInterceptor()], }); const spannerMock = mock.createMockSpanner(server); mockInstanceAdmin.createMockInstanceAdmin(server); @@ -119,6 +122,10 @@ describe('Spanner with mock server', () => { return instance.database(`database-${dbCounter++}`, options); } + beforeEach(() => { + xGoogReqIDInterceptor.reset(); + }); + before(async () => { sandbox = sinon.createSandbox(); port = await new Promise((resolve, reject) => { @@ -5119,7 +5126,7 @@ describe('Spanner with mock server', () => { const sentMetadata = spannerMock.getMetadata(); const sentRequests = spannerMock.getRequests(); - const xGoogRequestHeaders = []; + const xGoogRequestHeaders: grpc.MetadataValue[] = []; for (const index in sentMetadata) { const req = sentRequests[index]; console.log(index, 'req', req.constructor.name); @@ -5128,7 +5135,9 @@ describe('Spanner with mock server', () => { for (const md of sentMetadata) { const got = md.get('x-goog-spanner-request-id'); if (got) { - xGoogRequestHeaders.push(...got); + for (const value of got) { + xGoogRequestHeaders.push(value); + } } } console.log('xGoogHeaders', xGoogRequestHeaders!); From 86189c3f648dff92c18e6230bd0439187fe0490d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 22 Dec 2024 04:35:45 -0800 Subject: [PATCH 05/12] Remove accidentally committed code --- src/.instrumentation-spanner/index.ts | 16 -- .../instrumentation.ts | 204 ------------------ src/.instrumentation-spanner/old-index_ts | 99 --------- src/.instrumentation-spanner/types.ts | 25 --- src/.instrumentation-spanner/version.ts | 18 -- test/common/helper.ts | 44 ---- 6 files changed, 406 deletions(-) delete mode 100644 src/.instrumentation-spanner/index.ts delete mode 100644 src/.instrumentation-spanner/instrumentation.ts delete mode 100644 src/.instrumentation-spanner/old-index_ts delete mode 100644 src/.instrumentation-spanner/types.ts delete mode 100644 src/.instrumentation-spanner/version.ts delete mode 100644 test/common/helper.ts diff --git a/src/.instrumentation-spanner/index.ts b/src/.instrumentation-spanner/index.ts deleted file mode 100644 index 3efe082da..000000000 --- a/src/.instrumentation-spanner/index.ts +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -export * from './instrumentation'; -export * from './types'; diff --git a/src/.instrumentation-spanner/instrumentation.ts b/src/.instrumentation-spanner/instrumentation.ts deleted file mode 100644 index 3ca936bc9..000000000 --- a/src/.instrumentation-spanner/instrumentation.ts +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import { - InstrumentationBase, - InstrumentationNodeModuleDefinition, - isWrapped, -} from '@opentelemetry/instrumentation'; -import { - SEMATTRS_DB_SQL_TABLE, - SEMATTRS_DB_STATEMENT, - SEMATTRS_DB_SYSTEM, -} from '@opentelemetry/semantic-conventions'; -import { - Span, - SpanStatusCode, - Context, - context, - trace, - SpanKind, -} from '@opentelemetry/api'; -import {PACKAGE_NAME, PACKAGE_VERSION} from './version'; -import type * as spannerTypes from '..'; -import {SpannerInstrumentationConfig} from './types'; - -const optedInPII: boolean = - process.env.SPANNER_ENABLE_EXTENDED_TRACING === 'true'; - -interface SQLStatement { - sql: string; -} - -const {DiagConsoleLogger, DiagLogLevel, diag} = require('@opentelemetry/api'); -diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.ALL); - -export class SpannerInstrumentation extends InstrumentationBase { - static readonly COMMON_ATTRIBUTES = { - SEMATTRS_DB_SYSTEM: 'spanner', - }; - - constructor(config: SpannerInstrumentationConfig = {}) { - super(PACKAGE_NAME, PACKAGE_VERSION, config); - } - - protected init() { - console.log('SpannerInstrumentation.init invoked'); - - return [ - new InstrumentationNodeModuleDefinition( - '@google-cloud/spanner', - ['*'], - (moduleExports: typeof spannerTypes) => { - console.log('invoking wrapping'); - if (isWrapped(moduleExports.Instance.database)) { - this._unwrap(moduleExports, 'Instance'); - } - this._wrap( - moduleExports, - 'Instance', - this._patchCreateDatabase() as any - ); - } - ), - ]; - } - - private _patchCreateDatabase() { - console.log('_patchCreateDatabase'); - return (originalCreateDatabase: Function) => { - console.log('wrapping for patchCreateDatabase'); - const plugin = this; - - return function createDatabase() { - const db = originalCreateDatabase(...arguments); - console.log('createDatabase'); - - plugin._wrap(db, 'run', plugin._patchDatabaseRun(db) as any); - - return db; - }; - }; - } - - private _patchDatabaseRun(db: typeof spannerTypes.Database) { - return (originalDatabaseRun: Function) => { - const plugin = this; - - return function databaseRun() { - if (!plugin['_enabled']) { - plugin._unwrap(db, 'run'); - return originalDatabaseRun.apply(db, arguments); - } - - const query = arguments[0] || ''; - const cbIndex = Array.from(arguments).findIndex( - arg => typeof arg === 'function' - ); - const span = plugin.startTrace('Database.run', {sql: query}); - - const parentContext = context.active(); - if (cbIndex === -1) { - // We've got a promise not a callback. - const streamableQuery = context.with( - trace.setSpan(context.active(), span), - () => { - return originalDatabaseRun.apply(db, arguments); - } - ); - context.bind(parentContext, streamableQuery); - - return streamableQuery - .on('error', err => { - setSpanError(span, err); - }) - .on('end', () => { - span.end(); - }); - } - - // Here we've got a callback hence can wrap it differently. - plugin._wrap( - arguments, - cbIndex, - plugin._patchCallbackRun(span, parentContext) - ); - - return context.with(trace.setSpan(context.active(), span), () => { - return originalDatabaseRun.apply(db, arguments); - }); - }; - }; - } - - private _patchCallbackRun(span: Span, parentContext: Context) { - return (originalCallback: Function) => { - return (err: Error, rows: any) => { - setSpanError(span, err); - span.end(); - return context.with(parentContext, () => { - originalCallback(...arguments); - }); - }; - }; - } - - private startTrace( - spanNameSuffix: string, - opts: {tableName?: string; sql?: string | SQLStatement} - ): Span { - const span = this.tracer.startSpan( - 'cloud.google.com/nodejs/spanner/' + spanNameSuffix, - {kind: SpanKind.CLIENT} - ); - - if (opts.tableName) { - span.setAttribute(SEMATTRS_DB_SQL_TABLE, opts.tableName); - } - - const definedExtendedTracing = - this._config.enableExtendedTracing !== undefined; - // If they optedInPII but opts.enableExtendedTracing=false, reject it. - const explicitlySkipET = - definedExtendedTracing && !this._config.enableExtendedTracing; - if ( - opts.sql && - !explicitlySkipET && - (this._config.enableExtendedTracing || optedInPII) - ) { - const sql = opts.sql; - if (typeof sql === 'string') { - span.setAttribute(SEMATTRS_DB_STATEMENT, sql as string); - } else { - const stmt = sql as SQLStatement; - span.setAttribute(SEMATTRS_DB_STATEMENT, stmt.sql); - } - } - - return span; - } -} - -// setSpanError sets err, if non-nil onto the span with -// status.code=ERROR and the message of err.toString() -function setSpanError(span: Span, err: Error | String) { - if (!err || !span) { - return; - } - - span.setStatus({ - code: SpanStatusCode.ERROR, - message: err.toString(), - }); -} diff --git a/src/.instrumentation-spanner/old-index_ts b/src/.instrumentation-spanner/old-index_ts deleted file mode 100644 index c1cdfb51c..000000000 --- a/src/.instrumentation-spanner/old-index_ts +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import { - InstrumentationBase, - InstrumentationConfig, - InstrumentationNodeModuleDefinition, - InstrumentationNodeModuleFile, -} from '@opentelemetry/instrumentation'; - -import { - context, - trace, - Span, -} from '@opentelemetry/api'; - -export class SpannerInstrumentation extends InstrumentationBase { - constructor(config: InstrumentationConfig = {}) { - super('@google-cloud/spanner', 'v1.0.0', config); - } - - /* - * The init method will be called when the plugin is constructed. - * It returns an IntrumentationNodeModuleDefinition which describes - * the node module to be instrumented and patched. - * It might also return a list of InstrumentationNodeModuleDefinitions if - * the plugin should patch multiple omodules or versions. - */ - protected init(): [InstrumentationNodeModuleDefinition] { - console.log('spanner-instrumentation invoked'); - - return [ - new InstrumentationNodeModuleDefinition( - '@google-cloud/spanner', - ['*'], - moduleExports => { - this._wrap( - moduleExports.Spanner.prototype, - }), - ]; - } - - private _onPatchMain(moduleExports: typeof Database) { - console.log('_onPatchMain'); - this._wrap( - moduleExports.Database.prototype, - 'run', - this._patchMainMethodName()); - return moduleExports; - } - - private _onUnPatchMain(moduleExports: typeof Database) { - this._unwrap(moduleExports, 'run'); - } - - private _onPatchMethodName(moduleExports: typeof Database) { - console.log('_onPatchMethodName'); - this._wrap( - moduleExports, - 'run', - this._patchMethodName()); - return moduleExports; - } - - private _onUnPatchMethodName(moduleExports: typeof Database) { - this._unwrap(moduleExports, 'run'); - } - - private _patchMethodName(): (original) => any { - const plugin = this; - return function methodName(original) { - return function patchMethodName(this: any): Promise { - console.log('methodName', arguments); - return original.apply(this, arguments); - } - } - } - - private _patchMainMethodName(): (original) => any { - const plugin = this; - return function mainMethodName(original) { - return function patchMainMethodName(this: any): Promise { - console.log('mainMethodName', arguments); - return original.apply(this, arguments); - } - } - } -} diff --git a/src/.instrumentation-spanner/types.ts b/src/.instrumentation-spanner/types.ts deleted file mode 100644 index 18098cb7e..000000000 --- a/src/.instrumentation-spanner/types.ts +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import {InstrumentationConfig} from '@opentelemetry/instrumentation'; - -export interface SpannerInstrumentationConfig extends InstrumentationConfig { - // enableExtendedTracing when set to true allows spans to be - // annotated with the SQL being executed, where applicable. - enableExtendedTracing?: boolean; -} - -export interface DatabaseKind { - run(): any; -} diff --git a/src/.instrumentation-spanner/version.ts b/src/.instrumentation-spanner/version.ts deleted file mode 100644 index e3b95e7d9..000000000 --- a/src/.instrumentation-spanner/version.ts +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -const PACKAGE_NAME = 'spanner'; -const PACKAGE_VERSION = 'v1.0.0'; - -export {PACKAGE_NAME, PACKAGE_VERSION}; diff --git a/test/common/helper.ts b/test/common/helper.ts deleted file mode 100644 index fee1d8336..000000000 --- a/test/common/helper.ts +++ /dev/null @@ -1,44 +0,0 @@ -/*! - * Copyright 2024 Google LLC. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import {EventEmitter} from 'events'; -import * as through from 'through2'; - -export class FakeTransaction extends EventEmitter { - _ended: boolean; - constructor() { - super(); - this._ended = false; - } - commit(gaxOptions, callback) { - this.end(); - callback(null, {}); - } - createReadStream() { - return through.obj(); - } - deleteRows() {} - end() { - if (!this._ended) { - this.emit('end'); - this._ended = true; - } - } - insert() {} - replace() {} - upsert() {} - update() {} -} From 65e1c6dfc1f667b37df8eef04e4ce24870888ff3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 23 Dec 2024 06:32:42 -0800 Subject: [PATCH 06/12] Retrieve RequestID from headers and inject it back into Spanner.request --- src/batch-transaction.ts | 13 ++--- src/index.ts | 70 +++++++++++++++++++---- src/request_id_header.ts | 118 ++++++++++++++++++++++++++++++++++++++- src/transaction.ts | 33 ++++++----- test/spanner.ts | 3 +- 5 files changed, 201 insertions(+), 36 deletions(-) diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 03c63a4e2..464f5388f 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -26,6 +26,7 @@ import { addLeaderAwareRoutingHeader, } from '../src/common'; import {startTrace, setSpanError, traceConfig} from './instrument'; +import {injectRequestIDIntoHeaders} from './request_id_header'; export interface TransactionIdentifier { session: string | Session; @@ -211,12 +212,12 @@ class BatchTransaction extends Snapshot { const headers = { [CLOUD_RESOURCE_HEADER]: database.formattedName_, }; - (config.headers = this.session._metadataWithRequestId( + config.headers = this.session._metadataWithRequestId( database._nextNthRequest(), 1, headers - )), - delete query.partitionOptions; + ); + delete query.partitionOptions; this.session.request(config, (err, resp) => { if (err) { setSpanError(span, err); @@ -304,11 +305,7 @@ class BatchTransaction extends Snapshot { method: 'partitionRead', reqOpts, gaxOpts: options.gaxOptions, - headers: this.session._metadataWithRequestId( - database._nextNthRequest(), - 1, - headers - ), + headers: injectRequestIDIntoHeaders(headers, this.session), }, (err, partitions, resp) => { if (err) { diff --git a/src/index.ts b/src/index.ts index 01336e46a..1df86d8b4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -23,6 +23,7 @@ import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library'; import * as path from 'path'; import {common as p} from 'protobufjs'; import * as streamEvents from 'stream-events'; +import {EventEmitter} from 'events'; import * as through from 'through2'; import { codec, @@ -87,7 +88,10 @@ import { ensureInitialContextManagerSet, ensureContextPropagation, } from './instrument'; -import {AtomicCounter, nextSpannerClientId} from './request_id_header'; +import { + injectRequestIDIntoError, + nextSpannerClientId, +} from './request_id_header'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -148,8 +152,7 @@ export interface SpannerOptions extends GrpcClientOptions { routeToLeaderEnabled?: boolean; directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null; observabilityOptions?: ObservabilityOptions; - unaryInterceptors?: any[]; - streamInterceptors?: any[]; + interceptors?: any[]; } export interface RequestConfig { client: string; @@ -315,11 +318,9 @@ class Spanner extends GrpcService { } } - let unaryInterceptors: any[] = []; - let streamInterceptors: any[] = []; + let interceptors: any[] = []; if (options) { - unaryInterceptors = options.unaryInterceptors || []; - streamInterceptors = options.streamInterceptors || []; + interceptors = options.interceptors || []; } options = Object.assign( @@ -337,8 +338,7 @@ class Spanner extends GrpcService { // TODO: Negotiate with the Google team to plumb gRPC // settings such as interceptors to the gRPC client. - 'grpc.unaryInterceptors': unaryInterceptors, - 'grpc.streamInterceptors': streamInterceptors, + // 'grpc.interceptors': interceptors, grpc, }, options || {} @@ -359,6 +359,7 @@ class Spanner extends GrpcService { options.port = emulatorHost.port; options.sslCreds = grpc.credentials.createInsecure(); } + // console.log('options.interceptors', options.interceptors); const config = { baseUrl: options.apiEndpoint || @@ -1571,7 +1572,55 @@ class Spanner extends GrpcService { }, }) ); - callback(null, requestFn); + + const wrapped = (...args) => { + const hasCallback = + args && + args.length > 0 && + typeof args[args.length - 1] === 'function'; + + // console.log(config.method, "wrapped args", args, "requestFn", requestFn); + switch (hasCallback) { + case true: + const cb = args[args.length - 1]; + const priorArgs = args.slice(0, args.length - 1); + requestFn(...priorArgs, (...results) => { + if (results && results.length > 0) { + const err = results[0] as Error; + injectRequestIDIntoError(config, err); + } + + // console.log("wrapped with args and callback", results); + cb(...results); + }); + return; + + case false: + const res = requestFn(...args); + const stream = res as EventEmitter; + if (stream) { + stream.on('error', err => { + injectRequestIDIntoError(config, err as Error); + }); + } + + const originallyPromise = res instanceof Promise; + if (!originallyPromise) { + return res; + } + + return new Promise((resolve, reject) => { + requestFn(...args) + .then(resolve) + .catch(err => { + injectRequestIDIntoError(config, err as Error); + reject(err); + }); + }); + } + }; + + callback(null, wrapped); }); } @@ -1599,6 +1648,7 @@ class Spanner extends GrpcService { } else { return new Promise((resolve, reject) => { this.prepareGapicRequest_(config, (err, requestFn) => { + console.log('request.error', err, 'requestFn', requestFn); if (err) { reject(err); } else { diff --git a/src/request_id_header.ts b/src/request_id_header.ts index 049ee52d8..d02ee98c8 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -113,6 +113,10 @@ class XGoogRequestHeaderInterceptor { next(call); } + generateClientInterceptor() { + return this.interceptUnary.bind(this); + } + interceptStream(call, next) { const gotReqId = this.assertHasHeader(call); this.streamCalls.push({method: call.method, reqId: gotReqId}); @@ -131,6 +135,27 @@ class XGoogRequestHeaderInterceptor { this.unaryCalls = []; } + loggingClientInterceptor(options, call) { + const listener = new grpc.ListenerBuilder().withOnReceiveMessage( + (next, message) => { + console.log('Received message', JSON.stringify(message)); + next(message); + } + ); + + const requester = new grpc.RequesterBuilder() + .withSendMessage((next, message) => { + console.log('Requesting', call.method, JSON.stringify(message)); + next(message); + }) + .build(); + return new grpc.InterceptingCall(call(options), requester); + } + + generateLoggingClientInterceptor() { + return this.loggingClientInterceptor.bind(this); + } + serverInterceptor(methodDescriptor, call) { const method = call.handler.path; const isUnary = call.handler.type === 'unary'; @@ -141,7 +166,7 @@ class XGoogRequestHeaderInterceptor { const prefixesToIgnore: string[] = that.prefixesToIgnore || []; for (i = 0; i < prefixesToIgnore.length; i++) { const prefix = prefixesToIgnore[i]; - console.log(`prefix: ${prefix}\nmethod: ${method}`); + // console.log(`prefix: ${prefix}\nmethod: ${method}`); if (method.startsWith(prefix)) { next(metadata); return; @@ -193,11 +218,100 @@ class XGoogRequestHeaderInterceptor { } } +interface withHeaders { + headers: {[k: string]: string}; +} + +function extractRequestID(config: any): string { + if (!config) { + return ''; + } + + const hdrs = config as withHeaders; + if (hdrs && hdrs.headers) { + return hdrs.headers[X_GOOG_SPANNER_REQUEST_ID_HEADER]; + } + return ''; +} + +function injectRequestIDIntoError(config: any, err: Error) { + if (!err) { + return; + } + + // Inject that RequestID into the actual + // error object regardless of the type. + Object.assign(err, {requestID: extractRequestID(config)}); +} + +interface withNextNthRequest { + _nextNthRequest: Function; +} + +interface withMetadataWithRequestId { + _nthClientId: number; + _channelId: number; +} + +function injectRequestIDIntoHeaders( + headers: {[k: string]: string}, + session: any, + nthRequest?: number, + attempt?: number +) { + if (!session) { + return headers; + } + + if (!nthRequest) { + const database = session.parent as withNextNthRequest; + if (!(database && typeof database._nextNthRequest === 'function')) { + return headers; + } + console.log('database', database); + nthRequest = database._nextNthRequest(); + } + + attempt = attempt || 1; + return _metadataWithRequestId(session, nthRequest!, attempt, headers); +} + +function _metadataWithRequestId( + session: any, + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} +): {[k: string]: string} { + if (!priorMetadata) { + priorMetadata = {}; + } + const withReqId = { + ...priorMetadata, + }; + const database = session.parent as withMetadataWithRequestId; + let clientId = 1; + let channelId = 1; + if (database) { + clientId = database._nthClientId || 1; + channelId = database._channelId || 1; + } + withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( + clientId, + channelId, + nthRequest, + attempt + ); + return withReqId; +} + export { AtomicCounter, X_GOOG_SPANNER_REQUEST_ID_HEADER, + XGoogRequestHeaderInterceptor, craftRequestId, + extractRequestID, + injectRequestIDIntoError, + injectRequestIDIntoHeaders, nextSpannerClientId, newAtomicCounter, - XGoogRequestHeaderInterceptor, }; diff --git a/src/transaction.ts b/src/transaction.ts index d2e6a6229..d09790181 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -52,6 +52,7 @@ import { setSpanError, setSpanErrorAndException, } from './instrument'; +import {injectRequestIDIntoHeaders} from './request_id_header'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -456,11 +457,7 @@ export class Snapshot extends EventEmitter { method: 'beginTransaction', reqOpts, gaxOpts, - headers: this.session._metadataWithRequestId( - database._nextNthRequest(), - 1, - headers - ), + headers: injectRequestIDIntoHeaders(headers, this.session), }, ( err: null | grpc.ServiceError, @@ -721,8 +718,7 @@ export class Snapshot extends EventEmitter { return startTrace('Snapshot.createReadStream', traceConfig, span => { let attempt = 0; const database = this.session.parent as Database; - const nthRequest = database._nextNthRequest(); - + const nthRequest = nextNthRequest(database); const makeRequest = (resumeToken?: ResumeToken): Readable => { if (this.id && transaction.begin) { delete transaction.begin; @@ -749,10 +745,11 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: this.session._metadataWithRequestId( + headers: injectRequestIDIntoHeaders( + headers, + this.session, nthRequest, - attempt, - headers + attempt ), }); }; @@ -1312,7 +1309,7 @@ export class Snapshot extends EventEmitter { return startTrace('Snapshot.runStream', traceConfig, span => { let attempt = 0; const database = this.session.parent as Database; - const nthRequest = database._nextNthRequest(); + const nthRequest = nextNthRequest(database); const makeRequest = (resumeToken?: ResumeToken): Readable => { attempt++; @@ -1346,10 +1343,11 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: this.session._metadataWithRequestId( + headers: injectRequestIDIntoHeaders( + headers, + this.session, nthRequest, - attempt, - headers + attempt ), }); }; @@ -3064,6 +3062,13 @@ function isErrorAborted(err): boolean { ); } +function nextNthRequest(database): number { + if (!(database && typeof database._nextNthRequest === 'function')) { + return 1; + } + return database._nextNthRequest(); +} + /*! Developer Documentation * * All async methods (except for streams) return a Promise in the event diff --git a/test/spanner.ts b/test/spanner.ts index 65e6cc428..93aa025f6 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -178,8 +178,7 @@ describe('Spanner with mock server', () => { servicePath: 'localhost', port, sslCreds: grpc.credentials.createInsecure(), - streamInterceptors: [xGoogReqIDInterceptor.interceptStream], - unaryInterceptors: [xGoogReqIDInterceptor.interceptUnary], + interceptors: [xGoogReqIDInterceptor.generateLoggingClientInterceptor()], }); // Gets a reference to a Cloud Spanner instance and database instance = spanner.instance('instance'); From 8dd89b6689e38d5e15d87feff005c171c1d7428d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 23 Dec 2024 07:23:10 -0800 Subject: [PATCH 07/12] Update tests to propagate header --- src/batch-transaction.ts | 13 ++----------- src/index.ts | 4 ---- src/request_id_header.ts | 1 - src/transaction.ts | 17 +++++++++-------- test/database.ts | 20 +++++++++++++++++--- test/transaction.ts | 34 ++++++++++++++++++++++++++++------ 6 files changed, 56 insertions(+), 33 deletions(-) diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 464f5388f..8298ca029 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -148,7 +148,6 @@ class BatchTransaction extends Snapshot { traceConfig, span => { const database = this.session.parent as Database; - const nthRequest = database._nextNthRequest(); const headers: {[k: string]: string} = {}; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -160,11 +159,7 @@ class BatchTransaction extends Snapshot { method: 'partitionQuery', reqOpts, gaxOpts: query.gaxOptions, - headers: this.session._metadataWithRequestId( - nthRequest, - 1, - headers - ), + headers: injectRequestIDIntoHeaders(headers, this.session), }, (err, partitions, resp) => { if (err) { @@ -212,11 +207,7 @@ class BatchTransaction extends Snapshot { const headers = { [CLOUD_RESOURCE_HEADER]: database.formattedName_, }; - config.headers = this.session._metadataWithRequestId( - database._nextNthRequest(), - 1, - headers - ); + config.headers = injectRequestIDIntoHeaders(headers, this.session); delete query.partitionOptions; this.session.request(config, (err, resp) => { if (err) { diff --git a/src/index.ts b/src/index.ts index 1df86d8b4..03111098b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -359,7 +359,6 @@ class Spanner extends GrpcService { options.port = emulatorHost.port; options.sslCreds = grpc.credentials.createInsecure(); } - // console.log('options.interceptors', options.interceptors); const config = { baseUrl: options.apiEndpoint || @@ -1579,7 +1578,6 @@ class Spanner extends GrpcService { args.length > 0 && typeof args[args.length - 1] === 'function'; - // console.log(config.method, "wrapped args", args, "requestFn", requestFn); switch (hasCallback) { case true: const cb = args[args.length - 1]; @@ -1590,7 +1588,6 @@ class Spanner extends GrpcService { injectRequestIDIntoError(config, err); } - // console.log("wrapped with args and callback", results); cb(...results); }); return; @@ -1648,7 +1645,6 @@ class Spanner extends GrpcService { } else { return new Promise((resolve, reject) => { this.prepareGapicRequest_(config, (err, requestFn) => { - console.log('request.error', err, 'requestFn', requestFn); if (err) { reject(err); } else { diff --git a/src/request_id_header.ts b/src/request_id_header.ts index d02ee98c8..28a24cf9d 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -268,7 +268,6 @@ function injectRequestIDIntoHeaders( if (!(database && typeof database._nextNthRequest === 'function')) { return headers; } - console.log('database', database); nthRequest = database._nextNthRequest(); } diff --git a/src/transaction.ts b/src/transaction.ts index d09790181..6495ec755 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1975,11 +1975,11 @@ export class Transaction extends Dml { } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; const database = this.session.parent as Database; - const headers = this.session._metadataWithRequestId( - database._nextNthRequest(), - 1, + const headers = injectRequestIDIntoHeaders( this.commonHeaders_, - this.resourceHeader_ + this.session, + nextNthRequest(database), + 1 ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -2220,10 +2220,11 @@ export class Transaction extends Dml { method: 'commit', reqOpts, gaxOpts: gaxOpts, - headers: this.session._metadataWithRequestId( - database._nextNthRequest(), - 1, - headers + headers: injectRequestIDIntoHeaders( + headers, + this.session, + nextNthRequest(database), + 1 ), }, (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { diff --git a/test/database.ts b/test/database.ts index 5b3ba6c97..ebc745c03 100644 --- a/test/database.ts +++ b/test/database.ts @@ -47,6 +47,11 @@ import { MutationSet, } from '../src/transaction'; import {SessionFactory} from '../src/session-factory'; +import { + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, +} from '../src/request_id_header'; + let promisified = false; const fakePfy = extend({}, pfy, { promisifyAll(klass, options) { @@ -428,7 +433,10 @@ describe('Database', () => { assert.deepStrictEqual( headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [LEADER_AWARE_ROUTING_HEADER]: true, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }, database.commonHeaders_ ) ); @@ -2132,7 +2140,10 @@ describe('Database', () => { assert.deepStrictEqual( config.headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [LEADER_AWARE_ROUTING_HEADER]: true, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }, database.commonHeaders_ ) ); @@ -2553,7 +2564,10 @@ describe('Database', () => { assert.strictEqual(config.method, 'listSessions'); assert.deepStrictEqual(config.reqOpts, expectedReqOpts); assert.deepStrictEqual(config.gaxOpts, gaxOpts); - assert.deepStrictEqual(config.headers, database.commonHeaders_); + assert.deepStrictEqual(config.headers, { + ...database.commonHeaders_, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }); done(); }; diff --git a/test/transaction.ts b/test/transaction.ts index 590eff11e..558e55010 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -29,6 +29,10 @@ import { CLOUD_RESOURCE_HEADER, LEADER_AWARE_ROUTING_HEADER, } from '../src/common'; +import { + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, +} from '../src/request_id_header'; import RequestOptions = google.spanner.v1.RequestOptions; import { BatchUpdateOptions, @@ -273,7 +277,10 @@ describe('Transaction', () => { assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'streamingRead'); - assert.deepStrictEqual(headers, snapshot.commonHeaders_); + assert.deepStrictEqual(headers, { + ...snapshot.commonHeaders_, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }); }); it('should use the transaction id if present', () => { @@ -624,7 +631,10 @@ describe('Transaction', () => { assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'executeStreamingSql'); - assert.deepStrictEqual(headers, snapshot.commonHeaders_); + assert.deepStrictEqual(headers, { + ...snapshot.commonHeaders_, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }); }); it('should use the transaction id if present', () => { @@ -1416,7 +1426,10 @@ describe('Transaction', () => { assert.deepStrictEqual( headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + [LEADER_AWARE_ROUTING_HEADER]: true, + }, transaction.commonHeaders_ ) ); @@ -1623,7 +1636,10 @@ describe('Transaction', () => { assert.deepStrictEqual( headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + [LEADER_AWARE_ROUTING_HEADER]: true, + }, transaction.commonHeaders_ ) ); @@ -2179,7 +2195,10 @@ describe('Transaction', () => { assert.deepStrictEqual( config.headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + [LEADER_AWARE_ROUTING_HEADER]: true, + }, transaction.commonHeaders_ ) ); @@ -2227,7 +2246,10 @@ describe('Transaction', () => { assert.deepStrictEqual( headers, Object.assign( - {[LEADER_AWARE_ROUTING_HEADER]: true}, + { + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + [LEADER_AWARE_ROUTING_HEADER]: true, + }, transaction.commonHeaders_ ) ); From 188f77f85cf29d52752bfb4acf17cbfe03c9e2ed Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 24 Dec 2024 08:35:03 -0800 Subject: [PATCH 08/12] Update tests --- src/instance.ts | 5 ++++- src/request_id_header.ts | 8 ++++++++ src/session.ts | 19 +++++++++++-------- src/transaction.ts | 9 +-------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/instance.ts b/src/instance.ts index e15c94c5c..1504e5b51 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -980,7 +980,10 @@ class Instance extends common.GrpcServiceObject { if (!this.databases_.has(key!)) { const db = new Database(this, name, poolOptions, queryOptions); db._observabilityOptions = this._observabilityOptions; - db._clientId = (this.parent as Spanner)._nthClientId; + const parent = this.parent as Spanner; + if (parent && parent._nthClientId) { + db._clientId = parent._nthClientId; + } this.databases_.set(key!, db); } return this.databases_.get(key!)!; diff --git a/src/request_id_header.ts b/src/request_id_header.ts index 28a24cf9d..871e682c4 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -303,6 +303,13 @@ function _metadataWithRequestId( return withReqId; } +function nextNthRequest(database): number { + if (!(database && typeof database._nextNthRequest === 'function')) { + return 1; + } + return database._nextNthRequest(); +} + export { AtomicCounter, X_GOOG_SPANNER_REQUEST_ID_HEADER, @@ -311,6 +318,7 @@ export { extractRequestID, injectRequestIDIntoError, injectRequestIDIntoHeaders, + nextNthRequest, nextSpannerClientId, newAtomicCounter, }; diff --git a/src/session.ts b/src/session.ts index d8317693e..a72a34e67 100644 --- a/src/session.ts +++ b/src/session.ts @@ -44,6 +44,7 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; +import {injectRequestIDIntoHeaders, nextNthRequest} from './request_id_header'; export type GetSessionResponse = [Session, r.Response]; /** @@ -323,10 +324,11 @@ export class Session extends common.GrpcServiceObject { method: 'deleteSession', reqOpts, gaxOpts, - headers: database._metadataWithRequestId( - database._nextNthRequest(), - 1, - this.commonHeaders_ + headers: injectRequestIDIntoHeaders( + this.commonHeaders_, + this.session, + nextNthRequest(database), + 1 ), }, callback! @@ -400,10 +402,11 @@ export class Session extends common.GrpcServiceObject { method: 'getSession', reqOpts, gaxOpts, - headers: database._metadataWithRequestId( - database._nextNthRequest(), - 1, - headers + headers: injectRequestIDIntoHeaders( + headers, + this.session, + nextNthRequest(database), + 1 ), }, (err, resp) => { diff --git a/src/transaction.ts b/src/transaction.ts index 6495ec755..26c746d8b 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -52,7 +52,7 @@ import { setSpanError, setSpanErrorAndException, } from './instrument'; -import {injectRequestIDIntoHeaders} from './request_id_header'; +import {injectRequestIDIntoHeaders, nextNthRequest} from './request_id_header'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -3063,13 +3063,6 @@ function isErrorAborted(err): boolean { ); } -function nextNthRequest(database): number { - if (!(database && typeof database._nextNthRequest === 'function')) { - return 1; - } - return database._nextNthRequest(); -} - /*! Developer Documentation * * All async methods (except for streams) return a Promise in the event From e823c42719a7be4ca80c35bbde0023baa1706315 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 24 Dec 2024 10:40:32 -0800 Subject: [PATCH 09/12] Update tests --- src/batch-transaction.ts | 7 +++---- src/database.ts | 4 ++-- src/index.ts | 6 ++++-- src/session.ts | 22 +++++++--------------- src/transaction.ts | 1 - test/database.ts | 2 +- test/gapic_spanner_v1.ts | 6 +++--- test/request_id_header.ts | 9 +++++---- test/session.ts | 14 ++++++++++++-- test/transaction.ts | 2 +- 10 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 8298ca029..ee3a53a3e 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -147,7 +147,6 @@ class BatchTransaction extends Snapshot { 'BatchTransaction.createQueryPartitions', traceConfig, span => { - const database = this.session.parent as Database; const headers: {[k: string]: string} = {}; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -203,9 +202,9 @@ class BatchTransaction extends Snapshot { transaction: {id: this.id}, }); config.reqOpts = extend({}, query); - const database = this.session.parent as Database; const headers = { - [CLOUD_RESOURCE_HEADER]: database.formattedName_, + [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) + .formattedName_, }; config.headers = injectRequestIDIntoHeaders(headers, this.session); delete query.partitionOptions; @@ -289,7 +288,7 @@ class BatchTransaction extends Snapshot { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } - const database = this.session.parent as Database; + this.createPartitions_( { client: 'SpannerClient', diff --git a/src/database.ts b/src/database.ts index 1618315b3..cb03b6c8b 100644 --- a/src/database.ts +++ b/src/database.ts @@ -761,7 +761,7 @@ class Database extends common.GrpcServiceObject { ...priorMetadata, }; withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( - this._clientId, + this._clientId || 1, 1, // TODO: Properly infer the channelId nthRequest, attempt @@ -1037,7 +1037,7 @@ class Database extends common.GrpcServiceObject { const headers = this._metadataWithRequestId( this._nextNthRequest(), 1, - this.commonHeaders_, + this.commonHeaders_ ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); diff --git a/src/index.ts b/src/index.ts index 03111098b..3e87bc010 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1572,7 +1572,9 @@ class Spanner extends GrpcService { }) ); - const wrapped = (...args) => { + // Wrap requestFn so as to inject the spanner request id into + // every returned error, so that users can have debugging continuity. + const wrappedRequestFn = (...args) => { const hasCallback = args && args.length > 0 && @@ -1617,7 +1619,7 @@ class Spanner extends GrpcService { } }; - callback(null, wrapped); + callback(null, wrappedRequestFn); }); } diff --git a/src/session.ts b/src/session.ts index a72a34e67..d1dc6dfa4 100644 --- a/src/session.ts +++ b/src/session.ts @@ -325,8 +325,8 @@ export class Session extends common.GrpcServiceObject { reqOpts, gaxOpts, headers: injectRequestIDIntoHeaders( - this.commonHeaders_, - this.session, + this.resourceHeader_, + this, nextNthRequest(database), 1 ), @@ -461,25 +461,17 @@ export class Session extends common.GrpcServiceObject { method: 'executeSql', reqOpts, gaxOpts, - headers: database._metadataWithRequestId( - database._nextNthRequest(), - 1, - this.commonHeaders_ + headers: injectRequestIDIntoHeaders( + this.commonHeaders_, + this, + nextNthRequest(database), + 1 ), }, callback! ); } - public _metadataWithRequestId( - nthRequest: number, - attempt: number, - priorMetadata?: {[k: string]: string} - ): {[k: string]: string} { - const database = this.parent as Database; - return database._metadataWithRequestId(nthRequest, attempt, priorMetadata); - } - /** * Create a PartitionedDml transaction. * diff --git a/src/transaction.ts b/src/transaction.ts index 26c746d8b..04ca5a531 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -447,7 +447,6 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; - const database = this.session.parent as Database; return startTrace('Snapshot.begin', traceConfig, span => { span.addEvent('Begin Transaction'); diff --git a/test/database.ts b/test/database.ts index ebc745c03..2ad13b207 100644 --- a/test/database.ts +++ b/test/database.ts @@ -2141,7 +2141,7 @@ describe('Database', () => { config.headers, Object.assign( { - [LEADER_AWARE_ROUTING_HEADER]: true, + [LEADER_AWARE_ROUTING_HEADER]: 'true', [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), }, database.commonHeaders_ diff --git a/test/gapic_spanner_v1.ts b/test/gapic_spanner_v1.ts index d2220d051..460800b18 100644 --- a/test/gapic_spanner_v1.ts +++ b/test/gapic_spanner_v1.ts @@ -486,9 +486,9 @@ describe('v1.SpannerClient', () => { ).getCall(0).args[1].otherArgs.headers; const actualHeaderRequestParams = actualHeaders['x-goog-request-params']; assert(actualHeaderRequestParams.includes(expectedHeaderRequestParams)); - const actualRequestID = actualHeaders['x-goog-spanner-request-id']; - console.log('headers', actualHeaders); - assert.deepStrictEqual(actualRequestID, 'foo'); + // const actualRequestID = actualHeaders['x-goog-spanner-request-id']; + // console.log('headers', actualHeaders); + // assert.deepStrictEqual(actualRequestID, 'foo'); }); it('invokes batchCreateSessions without error using callback', async () => { diff --git a/test/request_id_header.ts b/test/request_id_header.ts index 31d28621e..d1c23de09 100644 --- a/test/request_id_header.ts +++ b/test/request_id_header.ts @@ -15,6 +15,7 @@ */ /* eslint-disable prefer-rest-params */ +import * as assert from 'assert'; import {AtomicCounter} from '../src/request_id_header'; describe('AtomicCounter', () => { @@ -35,13 +36,13 @@ describe('AtomicCounter', () => { const ac1 = new AtomicCounter(1); assert.strictEqual(ac1.value(), 1); assert.strictEqual( - ac0.increment(1 << 32), - (1 << 32) + 1, + ac1.increment(1 << 27), + (1 << 27) + 1, 'increment should return the added value' ); assert.strictEqual( - ac0.value(), - (1 << 32) + 1, + ac1.value(), + (1 << 27) + 1, 'increment should have modified the value' ); done(); diff --git a/test/session.ts b/test/session.ts index c5598de22..5d67d2e95 100644 --- a/test/session.ts +++ b/test/session.ts @@ -26,6 +26,10 @@ import { LEADER_AWARE_ROUTING_HEADER, } from '../src/common'; import {Database, Instance, Spanner} from '../src'; +import { + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, +} from '../src/request_id_header'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -262,7 +266,10 @@ describe('Session', () => { name: session.formattedName_, }); assert.deepStrictEqual(config.gaxOpts, {}); - assert.deepStrictEqual(config.headers, session.commonHeaders_); + assert.deepStrictEqual(config.headers, { + ...session.commonHeaders_, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }); assert.strictEqual(callback_, callback); return requestReturnValue; @@ -403,7 +410,10 @@ describe('Session', () => { sql: 'SELECT 1', }); assert.deepStrictEqual(config.gaxOpts, {}); - assert.deepStrictEqual(config.headers, session.commonHeaders_); + assert.deepStrictEqual(config.headers, { + ...session.commonHeaders_, + [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), + }); assert.strictEqual(callback_, callback); return requestReturnValue; }; diff --git a/test/transaction.ts b/test/transaction.ts index 558e55010..de6f2e5c5 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -1428,7 +1428,7 @@ describe('Transaction', () => { Object.assign( { [X_GOOG_SPANNER_REQUEST_ID_HEADER]: craftRequestId(1, 1, 1, 1), - [LEADER_AWARE_ROUTING_HEADER]: true, + [LEADER_AWARE_ROUTING_HEADER]: 'true', }, transaction.commonHeaders_ ) From 3213552917fda74792c0e7ca8bf677e91cb8bc21 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 27 Dec 2024 07:43:12 -0800 Subject: [PATCH 10/12] Rebase from main --- src/database.ts | 4 ++-- src/session.ts | 2 +- test/spanner.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/database.ts b/src/database.ts index cb03b6c8b..b454e2842 100644 --- a/src/database.ts +++ b/src/database.ts @@ -1962,7 +1962,7 @@ class Database extends common.GrpcServiceObject { const headers = this._metadataWithRequestId( this._nextNthRequest(), 1, - this.resourceHeader_ + this.commonHeaders_ ); return startTrace('Database.getSessions', this._traceConfig, span => { @@ -1975,7 +1975,7 @@ class Database extends common.GrpcServiceObject { method: 'listSessions', reqOpts, gaxOpts, - headers: this.commonHeaders_, + headers: headers, }, (err, sessions, nextPageRequest, ...args) => { if (err) { diff --git a/src/session.ts b/src/session.ts index d1dc6dfa4..bf1163250 100644 --- a/src/session.ts +++ b/src/session.ts @@ -325,7 +325,7 @@ export class Session extends common.GrpcServiceObject { reqOpts, gaxOpts, headers: injectRequestIDIntoHeaders( - this.resourceHeader_, + this.commonHeaders_, this, nextNthRequest(database), 1 diff --git a/test/spanner.ts b/test/spanner.ts index 93aa025f6..dff8cba4b 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -5106,8 +5106,8 @@ describe('Spanner with mock server', () => { }); }); - describe("XGoogRequestId", () => { - it('with retry on aborted query', async() => { + describe('XGoogRequestId', () => { + it('with retry on aborted query', async () => { let attempts = 0; const database = newTestDatabase(); let rowCount = 0; From ee2c50678bcda7441ec251e56315e8a4ccdd397b Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 27 Dec 2024 08:14:40 -0800 Subject: [PATCH 11/12] Update tests for XGoogRequestId to adjust for BatchCreateSessions --- src/request_id_header.ts | 9 +++++ test/spanner.ts | 77 +++++++++++++++++++++++++++++----------- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/src/request_id_header.ts b/src/request_id_header.ts index 871e682c4..1d2f30b2f 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -135,6 +135,14 @@ class XGoogRequestHeaderInterceptor { this.unaryCalls = []; } + public getUnaryCalls() { + return this.unaryCalls; + } + + public getStreamingCalls() { + return this.streamCalls; + } + loggingClientInterceptor(options, call) { const listener = new grpc.ListenerBuilder().withOnReceiveMessage( (next, message) => { @@ -321,4 +329,5 @@ export { nextNthRequest, nextSpannerClientId, newAtomicCounter, + randIdForProcess, }; diff --git a/test/spanner.ts b/test/spanner.ts index dff8cba4b..95cef543b 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -55,7 +55,10 @@ import { CLOUD_RESOURCE_HEADER, LEADER_AWARE_ROUTING_HEADER, } from '../src/common'; -import {XGoogRequestHeaderInterceptor} from '../src/request_id_header'; +import { + XGoogRequestHeaderInterceptor, + randIdForProcess, +} from '../src/request_id_header'; import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata; import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions; import v1 = google.spanner.v1; @@ -5111,35 +5114,69 @@ describe('Spanner with mock server', () => { let attempts = 0; const database = newTestDatabase(); let rowCount = 0; + const maxAttempts = 4; await database.runTransactionAsync(async transaction => { - if (!attempts) { + attempts++; + if (attempts < maxAttempts) { spannerMock.abortTransaction(transaction!); } - attempts++; const [rows] = await transaction!.run(selectSql); rows.forEach(() => rowCount++); assert.strictEqual(rowCount, 3); - assert.strictEqual(attempts, 2); + assert.strictEqual(attempts, 4); await transaction!.commit(); }); - const sentMetadata = spannerMock.getMetadata(); - const sentRequests = spannerMock.getRequests(); - const xGoogRequestHeaders: grpc.MetadataValue[] = []; - for (const index in sentMetadata) { - const req = sentRequests[index]; - console.log(index, 'req', req.constructor.name); - } + const wantUnaryCallsWithoutBatchCreateSessions = [ + { + method: '/google.spanner.v1.Spanner/BeginTransaction', + reqId: `1.${randIdForProcess}.1.1.3.1`, + }, + { + method: '/google.spanner.v1.Spanner/BeginTransaction', + reqId: `1.${randIdForProcess}.1.1.5.1`, + }, + { + method: '/google.spanner.v1.Spanner/BeginTransaction', + reqId: `1.${randIdForProcess}.1.1.7.1`, + }, + { + method: '/google.spanner.v1.Spanner/Commit', + reqId: `1.${randIdForProcess}.1.1.9.1`, + }, + ]; + const gotUnaryCalls = xGoogReqIDInterceptor.getUnaryCalls(); + assert.deepStrictEqual( + gotUnaryCalls[0].method, + '/google.spanner.v1.Spanner/BatchCreateSessions' + ); + // It is non-deterministic to try to get the exact clientId used to invoke .BatchCreateSessions + // given that these tests run as a collective and sessions are pooled. + assert.deepStrictEqual( + gotUnaryCalls.slice(1), + wantUnaryCallsWithoutBatchCreateSessions + ); - for (const md of sentMetadata) { - const got = md.get('x-goog-spanner-request-id'); - if (got) { - for (const value of got) { - xGoogRequestHeaders.push(value); - } - } - } - console.log('xGoogHeaders', xGoogRequestHeaders!); + const gotStreamingCalls = xGoogReqIDInterceptor.getStreamingCalls(); + const wantStreamingCalls = [ + { + method: '/google.spanner.v1.Spanner/ExecuteStreamingSql', + reqId: `1.${randIdForProcess}.1.1.2.1`, + }, + { + method: '/google.spanner.v1.Spanner/ExecuteStreamingSql', + reqId: `1.${randIdForProcess}.1.1.4.1`, + }, + { + method: '/google.spanner.v1.Spanner/ExecuteStreamingSql', + reqId: `1.${randIdForProcess}.1.1.6.1`, + }, + { + method: '/google.spanner.v1.Spanner/ExecuteStreamingSql', + reqId: `1.${randIdForProcess}.1.1.8.1`, + }, + ]; + assert.deepStrictEqual(gotStreamingCalls, wantStreamingCalls); await database.close(); }); }); From 54d11880ad30456e7d284bd289db24fd49e2fe21 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 27 Dec 2024 09:10:31 -0800 Subject: [PATCH 12/12] Assert against RequestIDError --- src/request_id_header.ts | 13 +++++++ test/spanner.ts | 77 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/request_id_header.ts b/src/request_id_header.ts index 1d2f30b2f..88495b276 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -46,6 +46,10 @@ class AtomicCounter { public toString(): string { return `${this.value()}`; } + + public reset(value: number) { + Atomics.store(this.backingBuffer, 0, 0); + } } function craftRequestId( @@ -59,6 +63,11 @@ function craftRequestId( const nthClientId = new AtomicCounter(); +// Only exported for deterministic testing. +export function resetNthClientId() { + nthClientId.reset(0); +} + /* * nextSpannerClientId increments the internal * counter for created SpannerClients, for use @@ -318,6 +327,10 @@ function nextNthRequest(database): number { return database._nextNthRequest(); } +export interface RequestIDError extends grpc.ServiceError { + requestID: string; +} + export { AtomicCounter, X_GOOG_SPANNER_REQUEST_ID_HEADER, diff --git a/test/spanner.ts b/test/spanner.ts index 95cef543b..d6329b96e 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -56,8 +56,10 @@ import { LEADER_AWARE_ROUTING_HEADER, } from '../src/common'; import { + RequestIDError, XGoogRequestHeaderInterceptor, randIdForProcess, + resetNthClientId, } from '../src/request_id_header'; import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata; import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions; @@ -126,6 +128,7 @@ describe('Spanner with mock server', () => { } beforeEach(() => { + resetNthClientId(); xGoogReqIDInterceptor.reset(); }); @@ -288,6 +291,10 @@ describe('Spanner with mock server', () => { // Ignore the fact that streaming read is unimplemented on the mock // server. We just want to verify that the correct request is sent. assert.strictEqual((e as ServiceError).code, Status.UNIMPLEMENTED); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.3.1` + ); } finally { snapshot.end(); await database.close(); @@ -447,6 +454,10 @@ describe('Spanner with mock server', () => { // Ignore the fact that streaming read is unimplemented on the mock // server. We just want to verify that the correct request is sent. assert.strictEqual((e as ServiceError).code, Status.UNIMPLEMENTED); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); return undefined; } finally { tx.end(); @@ -1143,6 +1154,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, '2 UNKNOWN: Test error' ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } finally { await database.close(); } @@ -1197,6 +1212,11 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, '14 UNAVAILABLE: Transient error' ); + // Ensure that we have a requestID returned and it was on the 2nd request. + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } finally { await database.close(); } @@ -1418,6 +1438,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, '2 UNKNOWN: Test error' ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } await database.close(); }); @@ -1456,6 +1480,10 @@ describe('Spanner with mock server', () => { database.run(selectSql, err => { assert.ok(err, 'Missing expected error'); assert.strictEqual(err!.message, '2 UNKNOWN: Non-retryable error'); + assert.deepStrictEqual( + (err as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); database .close() .catch(done) @@ -1479,6 +1507,10 @@ describe('Spanner with mock server', () => { .on('error', err => { assert.strictEqual(err.message, '2 UNKNOWN: Non-retryable error'); assert.strictEqual(receivedRows.length, index); + assert.deepStrictEqual( + (err as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); database .close() .catch(done) @@ -1559,6 +1591,10 @@ describe('Spanner with mock server', () => { attempts++; tx!.runUpdate(insertSql, err => { assert.ok(err, 'Missing expected error'); + assert.deepStrictEqual( + (err as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); assert.strictEqual(err!.code, grpc.status.INVALID_ARGUMENT); // Only the update RPC should be retried and not the entire // transaction. @@ -2566,6 +2602,7 @@ describe('Spanner with mock server', () => { it('should reuse sessions after executing invalid sql', async () => { // The query to execute + const requestIDRegex = new RegExp(`1.${randIdForProcess}.1.1.\\d+.1`); const query = { sql: invalidSql, }; @@ -2581,6 +2618,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` ); + assert.deepStrictEqual( + (e as RequestIDError).requestID.match(requestIDRegex) != null, + true + ); } } assert.strictEqual(pool.size, 1); @@ -2609,6 +2650,7 @@ describe('Spanner with mock server', () => { it('should reuse sessions after executing an invalid streaming sql', async () => { // The query to execute + const requestIDRegex = new RegExp(`1.${randIdForProcess}.1.1.\\d+.1`); const query = { sql: invalidSql, }; @@ -2624,6 +2666,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` ); + assert.deepStrictEqual( + (e as RequestIDError).requestID.match(requestIDRegex) != null, + true + ); } } assert.strictEqual(pool.size, 1); @@ -2702,6 +2748,7 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, 'No resources available.' ); + // assert.deepStrictEqual((e as RequestIDError).requestID,`1.${randIdForProcess}.1.1.1.1`); } } finally { if (tx1) { @@ -2776,6 +2823,7 @@ describe('Spanner with mock server', () => { assert.fail('missing expected error'); } catch (err) { assert.strictEqual((err as ServiceError).code, Status.NOT_FOUND); + // assert.deepStrictEqual((err as RequestIDError).requestID,`1.${randIdForProcess}.1.1.1.1`); } finally { await database.close(); } @@ -2837,6 +2885,7 @@ describe('Spanner with mock server', () => { (err as ServiceError).code, Status.PERMISSION_DENIED ); + // assert.deepStrictEqual((err as RequestIDError).requestID,`1.${randIdForProcess}.1.1.1.1`); } finally { await database.close(); } @@ -3247,6 +3296,10 @@ describe('Spanner with mock server', () => { assert.ok( (err as ServiceError).message.includes('Generic internal error') ); + assert.deepStrictEqual( + (err as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.3.1` + ); } finally { await database.close(); } @@ -3359,6 +3412,10 @@ describe('Spanner with mock server', () => { } catch (err) { assert(err, 'Expected an error to be thrown'); assert.match((err as Error).message, /Table FOO not found/); + assert.deepStrictEqual( + (err as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.3.1` + ); } }); }); @@ -3782,6 +3839,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } await tx.run(selectSql); await tx.commit(); @@ -3839,6 +3900,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } await tx.run(selectSql); await tx.commit(); @@ -3867,6 +3932,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } await tx.run(selectSql); await tx.commit(); @@ -3907,6 +3976,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, '2 UNKNOWN: Test error' ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.4.1` + ); } finally { await database.close(); } @@ -3995,6 +4068,10 @@ describe('Spanner with mock server', () => { (e as ServiceError).message, '2 UNKNOWN: Test error' ); + assert.deepStrictEqual( + (e as RequestIDError).requestID, + `1.${randIdForProcess}.1.1.2.1` + ); } finally { await database.close(); }