From 82b8983356cc52cdb6f3fb69e4ffd30bd4e236d3 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Fri, 23 Aug 2024 17:03:44 -0600 Subject: [PATCH] Fix node.js stream blockage in mxstream channels When the node.js implementation of a `MultiplexingStream` channel receives more data than the highWatermark (16KB) limit, a flowing stream stops flowing permanently, blocking all communication in that direction. This fixes the problem by calling `resume()` on the stream when flowing has been stopped by that particular data buffer. --- src/nerdbank-streams/src/Channel.ts | 9 ++++ .../src/tests/MultiplexingStream.spec.ts | 53 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index ef58f8ce..d9b17e16 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -216,8 +216,17 @@ export class ChannelClass extends Channel { } public onContent(buffer: Buffer | null) { + const priorReadableFlowing = this._duplex.readableFlowing + this._duplex.push(buffer) + // Large buffer pushes can switch a stream from flowing to non-flowing + // when it meets or exceeds the highWaterMark. We need to resume the stream + // in this case so that the user can continue to receive data. + if (priorReadableFlowing && this._duplex.readableFlowing === false) { + this._duplex.resume() + } + // We should find a way to detect when we *actually* share the received buffer with the Channel's user // and only report consumption when they receive the buffer from us so that we effectively apply // backpressure to the remote party based on our user's actual consumption rather than continually allocating memory. diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts index 04dbfe19..d28e4e84 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts @@ -8,6 +8,59 @@ import { Channel } from '../Channel' import CancellationToken from 'cancellationtoken' import * as assert from 'assert' import { nextTick } from 'process' +import { Duplex } from 'stream' + +it('highWatermark threshold does not clog', async () => { + // Brokered service + let bytesToReceive = 0 + let receivedAllBytes = new Deferred() + function receiver(pipe: Duplex) { + let lengths: number[] = [] + pipe.on('data', (data: Buffer) => { + lengths.push(data.length) + + bytesToReceive -= data.length + // console.log(`recv ${data.length}. ${bytesToReceive} remaining`) + if (bytesToReceive <= 0) { + receivedAllBytes.resolve(undefined) + } + }) + } + + // IServiceBroker + const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair() + receiver(localServicePipe) + + // MultiplexingStreamServiceBroker + const simulatedMxStream = FullDuplexStream.CreatePair() + const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)]) + const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')]) + servicePipe.pipe(local.stream) + local.stream.pipe(servicePipe) + + global.test_servicePipe = servicePipe + global.test_d = local.stream + global.test_localServicePipe = localServicePipe + + // brokered service client + function writeHelper(buffer: Buffer): boolean { + bytesToReceive += buffer.length + const result = remote.stream.write(buffer) + // console.log('written', buffer.length, result) + return result + } + for (let i = 15; i < 20; i++) { + const buffer = Buffer.alloc(i * 1024) + writeHelper(buffer) + await nextTickAsync() + writeHelper(Buffer.alloc(10)) + await nextTickAsync() + } + + if (bytesToReceive > 0) { + await receivedAllBytes.promise + } +}) ;[1, 2, 3].forEach(protocolMajorVersion => { describe(`MultiplexingStream v${protocolMajorVersion}`, () => { let mx1: MultiplexingStream