Skip to content

Commit

Permalink
TransformStream cleanup using transformer.cancel
Browse files Browse the repository at this point in the history
Add a "cancel" hook to "Transformer". This allows users to perform resource cleanup when the readable side of the TransformStream is cancelled, or the writable side is aborted.

To preserve existing behavior, when the readable side is cancelled with a reason, the writable side is always immediately aborted with that same reason. The same is true in the reverse case. This means that the status of both sides is always either "closed", "erroring", or "erroring" when the "cancel" hook is called.

"flush" and "cancel" are never both called. As per existing behavior, when the writable side is closed the "flush" hook is called. If the readable side is cancelled while a promise returned from "flush" is still pending, "cancel" is not called. In this scenario the readable side ends up in the "errored" state, while the writable side ends up in the "closed" state.
  • Loading branch information
lucacasonato authored Sep 30, 2023
1 parent 449df68 commit 007d729
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 43 deletions.
156 changes: 131 additions & 25 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -5501,13 +5501,15 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);
</xmp>

<dl>
Expand Down Expand Up @@ -5570,6 +5572,25 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
{{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down,
and terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer" lt="cancel">cancel(<var ignore>reason</var>)</dfn></dt>
<dd>
<p>A function called when the [=readable side=] is cancelled, or when the [=writable side=] is
aborted.

<p>Typically this is used to clean up underlying transformer resources when the stream is aborted
or cancelled.

<p>If the cancellation process is asynchronous, the function can return a promise to signal
success or failure; the result will be communicated to the caller of
{{WritableStream/abort()|stream.writable.abort()}} or
{{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same
as returning a rejected promise.

<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer">readableType</dfn></dt>
<dd>
<p>This property is reserved for future use, so any attempts to supply a value will throw an
Expand All @@ -5583,8 +5604,8 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform

The <code>controller</code> object passed to {{Transformer/start|start()}},
{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable
side=], or to terminate or error the stream.
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the
[=readable side=], or to terminate or error the stream.

<h4 id="ts-prototype">Constructor and properties</h4>

Expand Down Expand Up @@ -5738,6 +5759,16 @@ the following table:
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
<tbody>
<tr>
<td><dfn>\[[cancelAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm, taking one argument (the reason for
cancellation), which communicates a requested cancellation to the [=transformer=]
<tr>
<td><dfn>\[[finishPromise]]</dfn>
<td class="non-normative">A promise which resolves on completion of either the
[=TransformStreamDefaultController/[[cancelAlgorithm]]=] or the
[=TransformStreamDefaultController/[[flushAlgorithm]]=]. If this field is unpopulated (that is,
undefined), then neither of those algorithms have been [=invoked=] yet
<tr>
<td><dfn>\[[flushAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
Expand Down Expand Up @@ -5831,8 +5862,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Let |pullAlgorithm| be the following steps:
1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|).
1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument:
1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|).
1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|,
|pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Set |stream|.[=TransformStream/[[backpressure]]=] and
Expand Down Expand Up @@ -5866,12 +5896,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]).
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|).
1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
false).

<p class="note">The [$TransformStreamDefaultSinkWriteAlgorithm$] abstract operation could be
waiting for the promise stored in the [=TransformStream/[[backpressureChangePromise]]=] slot to
resolve. The call to [$TransformStreamSetBackpressure$] ensures that the promise always resolves.
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
</div>

<div algorithm>
Expand All @@ -5886,6 +5911,19 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Set |stream|.[=TransformStream/[[backpressure]]=] to |backpressure|.
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamUnblockWrite"
id="transform-stream-unblock-write">TransformStreamUnblockWrite(|stream|)</dfn> performs the
following steps:

1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
false).

<p class="note">The [$TransformStreamDefaultSinkWriteAlgorithm$] abstract operation could be
waiting for the promise stored in the [=TransformStream/[[backpressureChangePromise]]=] slot to
resolve. The call to [$TransformStreamSetBackpressure$] ensures that the promise always resolves.
</div>

<h4 id="ts-default-controller-abstract-ops">Default controllers</h4>

The following abstract operations support the implementaiton of the
Expand All @@ -5894,7 +5932,8 @@ The following abstract operations support the implementaiton of the
<div algorithm>
<dfn abstract-op lt="SetUpTransformStreamDefaultController"
id="set-up-transform-stream-default-controller">SetUpTransformStreamDefaultController(|stream|,
|controller|, |transformAlgorithm|, |flushAlgorithm|)</dfn> performs the following steps:
|controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|)</dfn> performs the
following steps:

1. Assert: |stream| [=implements=] {{TransformStream}}.
1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined.
Expand All @@ -5903,6 +5942,7 @@ The following abstract operations support the implementaiton of the
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to
|transformAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
</div>

<div algorithm>
Expand All @@ -5916,15 +5956,20 @@ The following abstract operations support the implementaiton of the
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]].
1. Otherwise, return [=a promise resolved with=] undefined.
1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an
algorithm which takes an argument |chunk| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/transform}}"] with argument list «&nbsp;|chunk|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an
algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"]
with argument list «&nbsp;|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an
algorithm which takes an argument |reason| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/cancel}}"] with argument list «&nbsp;|reason|&nbsp;» and
[=callback this value=] |transformer|.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithm|, |flushAlgorithm|).
|transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
</div>

<div algorithm>
Expand All @@ -5943,6 +5988,7 @@ The following abstract operations support the implementaiton of the

1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
</div>

<div algorithm>
Expand Down Expand Up @@ -6033,36 +6079,89 @@ side=] of [=transform streams=].
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Perform ! [$TransformStreamError$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|readable|.[=ReadableStream/[[storedError]]=].
1. Otherwise:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSinkCloseAlgorithm"
id="transform-stream-default-sink-close-algorithm">TransformStreamDefaultSinkCloseAlgorithm(|stream|)</dfn>
performs the following steps:

1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |flushPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return the result of [=reacting=] to |flushPromise|:
1. [=React=] to |flushPromise|:
1. If |flushPromise| was fulfilled, then:
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", throw
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|readable|.[=ReadableStream/[[storedError]]=].
1. Perform !
[$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. Otherwise:
1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |flushPromise| was rejected with reason |r|, then:
1. Perform ! [$TransformStreamError$](|stream|, |r|).
1. Throw |readable|.[=ReadableStream/[[storedError]]=].
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<h4 id="ts-default-source-abstract-ops">Default sources</h4>

The following abstract operation is used to implement the [=underlying source=] for the [=readable
side=] of [=transform streams=].

<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSourceCancelAlgorithm"
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. If |writable|.[=WritableStream/[[state]]=] is "`errored`", [=reject=]
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with
|writable|.[=WritableStream/[[storedError]]=].
1. Otherwise:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |r|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSourcePullAlgorithm"
id="transform-stream-default-source-pull">TransformStreamDefaultSourcePullAlgorithm(|stream|)</dfn>
Expand Down Expand Up @@ -7118,9 +7217,10 @@ reason.
<div algorithm="create a TransformStream">
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, and an optional algorithm <dfn
export for="TransformStream/set up"><var>cancelAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm| and, if given, |flushAlgorithm| and |cancelAlgorithm|, may return a promise.

1. Let |writableHighWaterMark| be 1.
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
Expand All @@ -7136,12 +7236,18 @@ reason.
null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|.
1. If |result| is a {{Promise}}, then return |result|.
1. Return [=a promise resolved with=] undefined.
1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|:
1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm|
was given, or null otherwise. If this throws an exception |e|, return
[=a promise rejected with=] |e|.
1. If |result| is a {{Promise}}, then return |result|.
1. Return [=a promise resolved with=] undefined.
1. Let |startPromise| be [=a promise resolved with=] undefined.
1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|,
|writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|).
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|).

Other specifications should be careful when constructing their
<i>[=TransformStream/set up/transformAlgorithm=]</i> to avoid [=in parallel=] reads from the given
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/Transformer.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);
Loading

0 comments on commit 007d729

Please sign in to comment.