Skip to content

Commit

Permalink
Fix node.js stream blockage in mxstream channels
Browse files Browse the repository at this point in the history
When the node.js implementation of a `MultiplexingStream` channel receives more data than the highWatermark (16KB) limit, a flowing stream stops flowing permanently, blocking all communication in that direction.

This fixes the problem by calling `resume()` on the stream when flowing has been stopped by that particular data buffer.
  • Loading branch information
AArnott committed Aug 23, 2024
1 parent 026821f commit 82b8983
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,17 @@ export class ChannelClass extends Channel {
}

public onContent(buffer: Buffer | null) {
const priorReadableFlowing = this._duplex.readableFlowing

this._duplex.push(buffer)

// Large buffer pushes can switch a stream from flowing to non-flowing
// when it meets or exceeds the highWaterMark. We need to resume the stream
// in this case so that the user can continue to receive data.
if (priorReadableFlowing && this._duplex.readableFlowing === false) {
this._duplex.resume()
}

// We should find a way to detect when we *actually* share the received buffer with the Channel's user
// and only report consumption when they receive the buffer from us so that we effectively apply
// backpressure to the remote party based on our user's actual consumption rather than continually allocating memory.
Expand Down
53 changes: 53 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,59 @@ import { Channel } from '../Channel'
import CancellationToken from 'cancellationtoken'
import * as assert from 'assert'
import { nextTick } from 'process'
import { Duplex } from 'stream'

it('highWatermark threshold does not clog', async () => {
// Brokered service
let bytesToReceive = 0
let receivedAllBytes = new Deferred()
function receiver(pipe: Duplex) {
let lengths: number[] = []
pipe.on('data', (data: Buffer) => {
lengths.push(data.length)

bytesToReceive -= data.length
// console.log(`recv ${data.length}. ${bytesToReceive} remaining`)
if (bytesToReceive <= 0) {
receivedAllBytes.resolve(undefined)
}
})
}

// IServiceBroker
const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair()
receiver(localServicePipe)

// MultiplexingStreamServiceBroker
const simulatedMxStream = FullDuplexStream.CreatePair()
const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)])
const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')])
servicePipe.pipe(local.stream)
local.stream.pipe(servicePipe)

global.test_servicePipe = servicePipe
global.test_d = local.stream
global.test_localServicePipe = localServicePipe

// brokered service client
function writeHelper(buffer: Buffer): boolean {
bytesToReceive += buffer.length
const result = remote.stream.write(buffer)
// console.log('written', buffer.length, result)
return result
}
for (let i = 15; i < 20; i++) {
const buffer = Buffer.alloc(i * 1024)
writeHelper(buffer)
await nextTickAsync()
writeHelper(Buffer.alloc(10))
await nextTickAsync()
}

if (bytesToReceive > 0) {
await receivedAllBytes.promise
}
})
;[1, 2, 3].forEach(protocolMajorVersion => {
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
let mx1: MultiplexingStream
Expand Down

0 comments on commit 82b8983

Please sign in to comment.