From 809227a1cbcf65aca52c010c9b04f5e8eb4b306d Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sun, 22 Sep 2024 15:30:36 +0200 Subject: [PATCH] refactor: move more internals to `async`/`await` (#1665) --- src/connection.ts | 370 ++++++++++++++++++++++++---------------------- 1 file changed, 193 insertions(+), 177 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 8b2612053..d810acb16 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1793,7 +1793,10 @@ class Connection extends EventEmitter { }; this._onSocketError = (error) => { - this.socketError(error); + this.dispatchEvent('socketError', error); + process.nextTick(() => { + this.emit('error', this.wrapSocketError(error)); + }); }; } @@ -1818,6 +1821,21 @@ class Connection extends EventEmitter { } this.transitionTo(this.STATE.CONNECTING); + this.initialiseConnection().then(() => { + process.nextTick(() => { + this.emit('connect'); + }); + }, (err) => { + this.transitionTo(this.STATE.FINAL); + this.closed = true; + + process.nextTick(() => { + this.emit('connect', err); + }); + process.nextTick(() => { + this.emit('end'); + }); + }); } /** @@ -1958,62 +1976,59 @@ class Connection extends EventEmitter { */ close() { this.transitionTo(this.STATE.FINAL); + this.cleanupConnection(); } /** * @private */ - initialiseConnection() { - (async () => { - const timeoutController = new AbortController(); - - const connectTimer = setTimeout(() => { - const hostPostfix = this.config.options.port ? `:${this.config.options.port}` : `\\${this.config.options.instanceName}`; - // If we have routing data stored, this connection has been redirected - const server = this.routingData ? this.routingData.server : this.config.server; - const port = this.routingData ? `:${this.routingData.port}` : hostPostfix; - // Grab the target host from the connection configuration, and from a redirect message - // otherwise, leave the message empty. - const routingMessage = this.routingData ? ` (redirected from ${this.config.server}${hostPostfix})` : ''; - const message = `Failed to connect to ${server}${port}${routingMessage} in ${this.config.options.connectTimeout}ms`; - this.debug.log(message); - - timeoutController.abort(new ConnectionError(message, 'ETIMEOUT')); - }, this.config.options.connectTimeout); + async initialiseConnection() { + const timeoutController = new AbortController(); - try { - let signal = timeoutController.signal; + const connectTimer = setTimeout(() => { + const hostPostfix = this.config.options.port ? `:${this.config.options.port}` : `\\${this.config.options.instanceName}`; + // If we have routing data stored, this connection has been redirected + const server = this.routingData ? this.routingData.server : this.config.server; + const port = this.routingData ? `:${this.routingData.port}` : hostPostfix; + // Grab the target host from the connection configuration, and from a redirect message + // otherwise, leave the message empty. + const routingMessage = this.routingData ? ` (redirected from ${this.config.server}${hostPostfix})` : ''; + const message = `Failed to connect to ${server}${port}${routingMessage} in ${this.config.options.connectTimeout}ms`; + this.debug.log(message); - let port = this.config.options.port; + timeoutController.abort(new ConnectionError(message, 'ETIMEOUT')); + }, this.config.options.connectTimeout); - if (!port) { - try { - port = await instanceLookup({ - server: this.config.server, - instanceName: this.config.options.instanceName!, - timeout: this.config.options.connectTimeout, - signal: signal - }); - } catch (err: any) { - if (signal.aborted) { - throw signal.reason; - } + try { + let signal = timeoutController.signal; - throw new ConnectionError(err.message, 'EINSTLOOKUP', { cause: err }); - } - } + let port = this.config.options.port; - let socket; + if (!port) { try { - socket = await this.connectOnPort(port, this.config.options.multiSubnetFailover, signal, this.config.options.connector); + port = await instanceLookup({ + server: this.config.server, + instanceName: this.config.options.instanceName!, + timeout: this.config.options.connectTimeout, + signal: signal + }); } catch (err: any) { - if (signal.aborted) { - throw signal.reason; - } + signal.throwIfAborted(); - throw this.wrapSocketError(err); + throw new ConnectionError(err.message, 'EINSTLOOKUP', { cause: err }); } + } + + let socket; + try { + socket = await this.connectOnPort(port, this.config.options.multiSubnetFailover, signal, this.config.options.connector); + } catch (err: any) { + signal.throwIfAborted(); + throw this.wrapSocketError(err); + } + + try { const controller = new AbortController(); const onError = (err: Error) => { controller.abort(this.wrapSocketError(err)); @@ -2033,9 +2048,9 @@ class Connection extends EventEmitter { socket.once('close', onClose); socket.once('end', onEnd); - signal = AbortSignal.any([signal, controller.signal]); - try { + signal = AbortSignal.any([signal, controller.signal]); + socket.setKeepAlive(true, KEEP_ALIVE_INITIAL_DELAY); this.messageIo = new MessageIO(socket, this.config.options.packetSize, this.debug); @@ -2049,7 +2064,8 @@ class Connection extends EventEmitter { this.sendPreLogin(); this.transitionTo(this.STATE.SENT_PRELOGIN); - await this.performSentPrelogin(signal); + const preloginResponse = await this.readPreloginResponse(signal); + await this.performTlsNegotiation(preloginResponse, signal); this.sendLogin7Packet(); @@ -2078,8 +2094,7 @@ class Connection extends EventEmitter { if (isTransientError(err)) { this.debug.log('Initiating retry on transient error'); this.transitionTo(this.STATE.TRANSIENT_FAILURE_RETRY); - this.performTransientFailureRetry(); - return; + return await this.performTransientFailureRetry(); } throw err; @@ -2088,8 +2103,7 @@ class Connection extends EventEmitter { // If routing data is present, we need to re-route the connection if (this.routingData) { this.transitionTo(this.STATE.REROUTING); - this.performReRouting(); - return; + return await this.performReRouting(); } this.transitionTo(this.STATE.LOGGED_IN_SENDING_INITIAL_SQL); @@ -2099,26 +2113,20 @@ class Connection extends EventEmitter { socket.removeListener('close', onClose); socket.removeListener('end', onEnd); } + } catch (err) { + socket.destroy(); - socket.on('error', this._onSocketError); - socket.on('close', this._onSocketClose); - socket.on('end', this._onSocketEnd); - - this.transitionTo(this.STATE.LOGGED_IN); - - process.nextTick(() => { - this.emit('connect'); - }); - } finally { - clearTimeout(connectTimer); + throw err; } - })().catch((err) => { - this.transitionTo(this.STATE.FINAL); - process.nextTick(() => { - this.emit('connect', err); - }); - }); + socket.on('error', this._onSocketError); + socket.on('close', this._onSocketClose); + socket.on('end', this._onSocketEnd); + + this.transitionTo(this.STATE.LOGGED_IN); + } finally { + clearTimeout(connectTimer); + } } /** @@ -2141,7 +2149,6 @@ class Connection extends EventEmitter { } this.closed = true; - this.loginError = undefined; } } @@ -2163,59 +2170,50 @@ class Connection extends EventEmitter { return new TokenStreamParser(message, this.debug, handler, this.config.options); } - wrapWithTls(socket: net.Socket, signal: AbortSignal): Promise { + async wrapWithTls(socket: net.Socket, signal: AbortSignal): Promise { signal.throwIfAborted(); - return new Promise((resolve, reject) => { - const secureContext = tls.createSecureContext(this.secureContextOptions); - // If connect to an ip address directly, - // need to set the servername to an empty string - // if the user has not given a servername explicitly - const serverName = !net.isIP(this.config.server) ? this.config.server : ''; - const encryptOptions = { - host: this.config.server, - socket: socket, - ALPNProtocols: ['tds/8.0'], - secureContext: secureContext, - servername: this.config.options.serverName ? this.config.options.serverName : serverName, - }; - - const encryptsocket = tls.connect(encryptOptions); - - const onAbort = () => { - encryptsocket.removeListener('error', onError); - encryptsocket.removeListener('connect', onConnect); - - encryptsocket.destroy(); - - reject(signal.reason); - }; + const secureContext = tls.createSecureContext(this.secureContextOptions); + // If connect to an ip address directly, + // need to set the servername to an empty string + // if the user has not given a servername explicitly + const serverName = !net.isIP(this.config.server) ? this.config.server : ''; + const encryptOptions = { + host: this.config.server, + socket: socket, + ALPNProtocols: ['tds/8.0'], + secureContext: secureContext, + servername: this.config.options.serverName ? this.config.options.serverName : serverName, + }; - const onError = (err: Error) => { - signal.removeEventListener('abort', onAbort); + const { promise, resolve, reject } = withResolvers(); + const encryptsocket = tls.connect(encryptOptions); - encryptsocket.removeListener('error', onError); - encryptsocket.removeListener('connect', onConnect); + try { + const onAbort = () => { reject(signal.reason); }; + signal.addEventListener('abort', onAbort, { once: true }); - encryptsocket.destroy(); + try { + const onError = reject; + const onConnect = () => { resolve(encryptsocket); }; - reject(err); - }; + encryptsocket.once('error', onError); + encryptsocket.once('secureConnect', onConnect); - const onConnect = () => { + try { + return await promise; + } finally { + encryptsocket.removeListener('error', onError); + encryptsocket.removeListener('connect', onConnect); + } + } finally { signal.removeEventListener('abort', onAbort); + } + } catch (err: any) { + encryptsocket.destroy(); - encryptsocket.removeListener('error', onError); - encryptsocket.removeListener('connect', onConnect); - - resolve(encryptsocket); - }; - - signal.addEventListener('abort', onAbort, { once: true }); - - encryptsocket.on('error', onError); - encryptsocket.on('secureConnect', onConnect); - }); + throw err; + } } async connectOnPort(port: number, multiSubnetFailover: boolean, signal: AbortSignal, customConnector?: () => Promise) { @@ -2388,18 +2386,6 @@ class Connection extends EventEmitter { } } - /** - * @private - */ - socketError(error: Error) { - if (this.state === this.STATE.CONNECTING || this.state === this.STATE.SENT_TLSSSLNEGOTIATION) { - this.emit('connect', this.wrapSocketError(error)); - } else { - this.emit('error', this.wrapSocketError(error)); - } - this.dispatchEvent('socketError', error); - } - /** * @private */ @@ -2408,7 +2394,11 @@ class Connection extends EventEmitter { if (this.state !== this.STATE.FINAL) { const error: ErrorWithCode = new Error('socket hang up'); error.code = 'ECONNRESET'; - this.socketError(error); + + this.dispatchEvent('socketError', error); + process.nextTick(() => { + this.emit('error', this.wrapSocketError(error)); + }); } } @@ -2418,6 +2408,7 @@ class Connection extends EventEmitter { socketClose() { this.debug.log('connection to ' + this.config.server + ':' + this.config.options.port + ' closed'); this.transitionTo(this.STATE.FINAL); + this.cleanupConnection(); } /** @@ -3290,7 +3281,7 @@ class Connection extends EventEmitter { /** * @private */ - async performSentPrelogin(signal: AbortSignal) { + async performTlsNegotiation(preloginPayload: PreloginPayload, signal: AbortSignal) { signal.throwIfAborted(); const { promise: signalAborted, reject } = withResolvers(); @@ -3299,34 +3290,6 @@ class Connection extends EventEmitter { signal.addEventListener('abort', onAbort, { once: true }); try { - let messageBuffer = Buffer.alloc(0); - - const message = await Promise.race([ - this.messageIo.readMessage().catch((err) => { - throw this.wrapSocketError(err); - }), - signalAborted - ]); - - const iterator = message[Symbol.asyncIterator](); - while (true) { - const { done, value } = await Promise.race([ - iterator.next(), - signalAborted - ]); - - if (done) { - break; - } - - messageBuffer = Buffer.concat([messageBuffer, value]); - } - - const preloginPayload = new PreloginPayload(messageBuffer); - this.debug.payload(function() { - return preloginPayload.toString(' '); - }); - if (preloginPayload.fedAuthRequired === 1) { this.fedAuthRequired = true; } @@ -3348,10 +3311,58 @@ class Connection extends EventEmitter { } } + async readPreloginResponse(signal: AbortSignal): Promise { + signal.throwIfAborted(); + + let messageBuffer = Buffer.alloc(0); + + const { promise: signalAborted, reject } = withResolvers(); + + const onAbort = () => { reject(signal.reason); }; + signal.addEventListener('abort', onAbort, { once: true }); + + try { + const message = await Promise.race([ + this.messageIo.readMessage().catch((err) => { + throw this.wrapSocketError(err); + }), + signalAborted + ]); + + const iterator = message[Symbol.asyncIterator](); + try { + while (true) { + const { done, value } = await Promise.race([ + iterator.next(), + signalAborted + ]); + + if (done) { + break; + } + + messageBuffer = Buffer.concat([messageBuffer, value]); + } + } finally { + if (iterator.return) { + await iterator.return(); + } + } + } finally { + signal.removeEventListener('abort', onAbort); + } + + const preloginPayload = new PreloginPayload(messageBuffer); + this.debug.payload(function() { + return preloginPayload.toString(' '); + }); + return preloginPayload; + } + /** * @private */ - performReRouting() { + async performReRouting() { this.socket!.removeListener('error', this._onSocketError); this.socket!.removeListener('close', this._onSocketClose); this.socket!.removeListener('end', this._onSocketEnd); @@ -3364,16 +3375,15 @@ class Connection extends EventEmitter { // Attempt connecting to the rerouting target this.transitionTo(this.STATE.CONNECTING); + await this.initialiseConnection(); } /** * @private */ - performTransientFailureRetry() { + async performTransientFailureRetry() { this.curTransientRetryCount++; - this.loginError = undefined; - this.socket!.removeListener('error', this._onSocketError); this.socket!.removeListener('close', this._onSocketClose); this.socket!.removeListener('end', this._onSocketEnd); @@ -3385,10 +3395,13 @@ class Connection extends EventEmitter { const port = this.routingData ? this.routingData.port : this.config.options.port; this.debug.log('Retry after transient failure connecting to ' + server + ':' + port); - setTimeout(() => { - this.emit('retry'); - this.transitionTo(this.STATE.CONNECTING); - }, this.config.options.connectionRetryInterval); + const { promise, resolve } = withResolvers(); + setTimeout(resolve, this.config.options.connectionRetryInterval); + await promise; + + this.emit('retry'); + this.transitionTo(this.STATE.CONNECTING); + await this.initialiseConnection(); } /** @@ -3422,6 +3435,7 @@ class Connection extends EventEmitter { throw new ConnectionError('Login failed.', 'ELOGIN'); } } finally { + this.loginError = undefined; signal.removeEventListener('abort', onAbort); } } @@ -3478,6 +3492,7 @@ class Connection extends EventEmitter { } } } finally { + this.loginError = undefined; signal.removeEventListener('abort', onAbort); } } @@ -3562,6 +3577,8 @@ class Connection extends EventEmitter { signalAborted ]); } catch (err) { + signal.throwIfAborted(); + throw new AggregateError( [new ConnectionError('Security token could not be authenticated or authorized.', 'EFEDAUTH'), err]); } @@ -3582,6 +3599,7 @@ class Connection extends EventEmitter { throw new ConnectionError('Login failed.', 'ELOGIN'); } } finally { + this.loginError = undefined; signal.removeEventListener('abort', onAbort); } } @@ -3635,9 +3653,6 @@ Connection.prototype.STATE = { }, CONNECTING: { name: 'Connecting', - enter: function() { - this.initialiseConnection(); - }, events: {} }, SENT_PRELOGIN: { @@ -3677,6 +3692,7 @@ Connection.prototype.STATE = { events: { socketError: function() { this.transitionTo(this.STATE.FINAL); + this.cleanupConnection(); } } }, @@ -3688,7 +3704,11 @@ Connection.prototype.STATE = { try { message = await this.messageIo.readMessage(); } catch (err: any) { - return this.socketError(err); + this.dispatchEvent('socketError', err); + process.nextTick(() => { + this.emit('error', this.wrapSocketError(err)); + }); + return; } // request timer is stopped on first data package this.clearRequestTimer(); @@ -3768,6 +3788,7 @@ Connection.prototype.STATE = { const sqlRequest = this.request!; this.request = undefined; this.transitionTo(this.STATE.FINAL); + this.cleanupConnection(); sqlRequest.callback(err); } @@ -3781,7 +3802,11 @@ Connection.prototype.STATE = { try { message = await this.messageIo.readMessage(); } catch (err: any) { - return this.socketError(err); + this.dispatchEvent('socketError', err); + process.nextTick(() => { + this.emit('error', this.wrapSocketError(err)); + }); + return; } const handler = new AttentionTokenHandler(this, this.request!); @@ -3815,6 +3840,7 @@ Connection.prototype.STATE = { this.request = undefined; this.transitionTo(this.STATE.FINAL); + this.cleanupConnection(); sqlRequest.callback(err); } @@ -3822,16 +3848,6 @@ Connection.prototype.STATE = { }, FINAL: { name: 'Final', - enter: function() { - this.cleanupConnection(); - }, - events: { - message: function() { - // Do nothing - }, - socketError: function() { - // Do nothing - } - } + events: {} } };