From cec4cedc22c7b9244ed38774a4b497fdfad41553 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 26 Dec 2024 13:42:47 +0100 Subject: [PATCH 1/4] worker: flush stdout and stderr on exit Signed-off-by: Matteo Collina --- test/parallel/test-worker-stdio-flush.js | 28 ++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 test/parallel/test-worker-stdio-flush.js diff --git a/test/parallel/test-worker-stdio-flush.js b/test/parallel/test-worker-stdio-flush.js new file mode 100644 index 00000000000000..0528b8758ca5a4 --- /dev/null +++ b/test/parallel/test-worker-stdio-flush.js @@ -0,0 +1,28 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const fs = require('fs'); +const util = require('util'); +const { Writable } = require('stream'); +const { Worker, isMainThread } = require('worker_threads'); + +if (isMainThread) { + const w = new Worker(__filename, { stdout: true }); + const expected = 'hello world' + + let data = '' + w.stdout.setEncoding('utf8') + w.stdout.on('data', (chunk) => { + data += chunk + }) + + w.on('exit', common.mustCall(() => { + assert.strictEqual(data, expected) + })); +} else { + process.stdout.write('hello') + process.on('exit', () => { + process.stdout.write(' ') + process.stdout.write('world') + }) +} From 1be4989fdce87495b8dbfbde55528524a20d1e4a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 1 Jan 2025 17:52:26 -0500 Subject: [PATCH 2/4] fixup Signed-off-by: Matteo Collina --- lib/internal/worker/io.js | 3 +++ test/parallel/test-worker-stdio-flush.js | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 42b8845cec6711..111a43506ada2f 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -292,6 +292,9 @@ class WritableWorkerStdio extends Writable { chunks: ArrayPrototypeMap(chunks, ({ chunk, encoding }) => ({ chunk, encoding })), }); + if (process._exiting) { + cb() + } ArrayPrototypePush(this[kWritableCallbacks], cb); if (this[kPort][kWaitingStreams]++ === 0) this[kPort].ref(); diff --git a/test/parallel/test-worker-stdio-flush.js b/test/parallel/test-worker-stdio-flush.js index 0528b8758ca5a4..179a780c86d301 100644 --- a/test/parallel/test-worker-stdio-flush.js +++ b/test/parallel/test-worker-stdio-flush.js @@ -20,9 +20,9 @@ if (isMainThread) { assert.strictEqual(data, expected) })); } else { - process.stdout.write('hello') process.on('exit', () => { process.stdout.write(' ') process.stdout.write('world') }) + process.stdout.write('hello') } From 5ac939c0f521b94584dccd428f06fa831bf92067 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 4 Jan 2025 20:23:48 -0500 Subject: [PATCH 3/4] fixup Signed-off-by: Matteo Collina --- .../bootstrap/switches/is_not_main_thread.js | 13 ++++++++- lib/internal/worker/io.js | 7 ++--- .../test-worker-stdio-flush-inflight.js | 27 +++++++++++++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-worker-stdio-flush-inflight.js diff --git a/lib/internal/bootstrap/switches/is_not_main_thread.js b/lib/internal/bootstrap/switches/is_not_main_thread.js index 03aa7c3ebe12f2..6fa30aec748af0 100644 --- a/lib/internal/bootstrap/switches/is_not_main_thread.js +++ b/lib/internal/bootstrap/switches/is_not_main_thread.js @@ -33,11 +33,22 @@ process.removeListener('removeListener', stopListeningIfSignal); const { createWorkerStdio, + kStdioWantsMoreDataCallback, } = require('internal/worker/io'); let workerStdio; function lazyWorkerStdio() { - return workerStdio ??= createWorkerStdio(); + if (workerStdio === undefined) { + workerStdio = createWorkerStdio(); + process.on('exit', flushSync); + } + + return workerStdio; +} + +function flushSync() { + workerStdio.stdout[kStdioWantsMoreDataCallback](); + workerStdio.stderr[kStdioWantsMoreDataCallback](); } function getStdout() { return lazyWorkerStdio().stdout; } diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 111a43506ada2f..c31833976192c2 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -294,10 +294,11 @@ class WritableWorkerStdio extends Writable { }); if (process._exiting) { cb() + } else { + ArrayPrototypePush(this[kWritableCallbacks], cb); + if (this[kPort][kWaitingStreams]++ === 0) + this[kPort].ref(); } - ArrayPrototypePush(this[kWritableCallbacks], cb); - if (this[kPort][kWaitingStreams]++ === 0) - this[kPort].ref(); } _final(cb) { diff --git a/test/parallel/test-worker-stdio-flush-inflight.js b/test/parallel/test-worker-stdio-flush-inflight.js new file mode 100644 index 00000000000000..93880c59537bf2 --- /dev/null +++ b/test/parallel/test-worker-stdio-flush-inflight.js @@ -0,0 +1,27 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const fs = require('fs'); +const util = require('util'); +const { Writable } = require('stream'); +const { Worker, isMainThread } = require('worker_threads'); + +if (isMainThread) { + const w = new Worker(__filename, { stdout: true }); + const expected = 'hello world' + + let data = '' + w.stdout.setEncoding('utf8') + w.stdout.on('data', (chunk) => { + data += chunk + }) + + w.on('exit', common.mustCall(() => { + assert.strictEqual(data, expected) + })); +} else { + process.stdout.write('hello') + process.stdout.write(' ') + process.stdout.write('world') + process.exit(0) +} From de200d845ba6637cdf3bb823fe050dbd2926f89a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 4 Jan 2025 21:01:08 -0500 Subject: [PATCH 4/4] fixup Signed-off-by: Matteo Collina --- lib/internal/worker/io.js | 2 +- .../test-worker-stdio-flush-inflight.js | 23 ++++++++----------- test/parallel/test-worker-stdio-flush.js | 23 ++++++++----------- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index c31833976192c2..2b28c6a2487b11 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -293,7 +293,7 @@ class WritableWorkerStdio extends Writable { ({ chunk, encoding }) => ({ chunk, encoding })), }); if (process._exiting) { - cb() + cb(); } else { ArrayPrototypePush(this[kWritableCallbacks], cb); if (this[kPort][kWaitingStreams]++ === 0) diff --git a/test/parallel/test-worker-stdio-flush-inflight.js b/test/parallel/test-worker-stdio-flush-inflight.js index 93880c59537bf2..34b81152811e7b 100644 --- a/test/parallel/test-worker-stdio-flush-inflight.js +++ b/test/parallel/test-worker-stdio-flush-inflight.js @@ -1,27 +1,24 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const fs = require('fs'); -const util = require('util'); -const { Writable } = require('stream'); const { Worker, isMainThread } = require('worker_threads'); if (isMainThread) { const w = new Worker(__filename, { stdout: true }); - const expected = 'hello world' + const expected = 'hello world'; - let data = '' - w.stdout.setEncoding('utf8') + let data = ''; + w.stdout.setEncoding('utf8'); w.stdout.on('data', (chunk) => { - data += chunk - }) + data += chunk; + }); w.on('exit', common.mustCall(() => { - assert.strictEqual(data, expected) + assert.strictEqual(data, expected); })); } else { - process.stdout.write('hello') - process.stdout.write(' ') - process.stdout.write('world') - process.exit(0) + process.stdout.write('hello'); + process.stdout.write(' '); + process.stdout.write('world'); + process.exit(0); } diff --git a/test/parallel/test-worker-stdio-flush.js b/test/parallel/test-worker-stdio-flush.js index 179a780c86d301..e52e721fc69483 100644 --- a/test/parallel/test-worker-stdio-flush.js +++ b/test/parallel/test-worker-stdio-flush.js @@ -1,28 +1,25 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const fs = require('fs'); -const util = require('util'); -const { Writable } = require('stream'); const { Worker, isMainThread } = require('worker_threads'); if (isMainThread) { const w = new Worker(__filename, { stdout: true }); - const expected = 'hello world' + const expected = 'hello world'; - let data = '' - w.stdout.setEncoding('utf8') + let data = ''; + w.stdout.setEncoding('utf8'); w.stdout.on('data', (chunk) => { - data += chunk - }) + data += chunk; + }); w.on('exit', common.mustCall(() => { - assert.strictEqual(data, expected) + assert.strictEqual(data, expected); })); } else { process.on('exit', () => { - process.stdout.write(' ') - process.stdout.write('world') - }) - process.stdout.write('hello') + process.stdout.write(' '); + process.stdout.write('world'); + }); + process.stdout.write('hello'); }