Skip to content

Commit

Permalink
transmit source event stream errors as an error payload
Browse files Browse the repository at this point in the history
1. akin to a request error, without any data field
2. and close the response event stream ensuring no further events are sent
  • Loading branch information
yaacovCR committed Nov 26, 2024
1 parent 079167d commit f7e151d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 24 deletions.
25 changes: 18 additions & 7 deletions src/execution/__tests__/cancellation-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { assert, expect } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON.js';
import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';

import { isAsyncIterable } from '../../jsutils/isAsyncIterable.js';
Expand Down Expand Up @@ -902,9 +901,15 @@ describe('Execute: Cancellation', () => {

abortController.abort();

await expectPromise(subscription.next()).toRejectWith(
'This operation was aborted',
);
expectJSON(await subscription.next()).toDeepEqual({
value: { errors: [{ message: 'This operation was aborted' }] },
done: false,
});

expectJSON(await subscription.next()).toDeepEqual({
value: undefined,
done: true,
});
});

it('should stop the execution when aborted during subscription returned asynchronously', async () => {
Expand Down Expand Up @@ -941,8 +946,14 @@ describe('Execute: Cancellation', () => {

abortController.abort();

await expectPromise(subscription.next()).toRejectWith(
'This operation was aborted',
);
expectJSON(await subscription.next()).toDeepEqual({
value: { errors: [{ message: 'This operation was aborted' }] },
done: false,
});

expectJSON(await subscription.next()).toDeepEqual({
value: undefined,
done: true,
});
});
});
22 changes: 22 additions & 0 deletions src/execution/__tests__/mapAsyncIterable-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ describe('mapAsyncIterable', () => {
});
});

it('calls onError with iterator errors', async () => {
async function* source() {
yield 1;
throw new Error('Oops');
}

const doubles = mapAsyncIterable(
source(),
(x) => Promise.resolve(x + x),
() => Promise.resolve(0),
);

expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 0, done: false });
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('calls done when completes', async () => {
async function* source() {
yield 1;
Expand All @@ -100,6 +120,7 @@ describe('mapAsyncIterable', () => {
const doubles = mapAsyncIterable(
source(),
(x) => Promise.resolve(x + x),
undefined,
() => {
done = true;
},
Expand All @@ -126,6 +147,7 @@ describe('mapAsyncIterable', () => {
const doubles = mapAsyncIterable(
source(),
(x) => Promise.resolve(x + x),
undefined,
() => {
done = true;
},
Expand Down
30 changes: 19 additions & 11 deletions src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ describe('Subscription Publish Phase', () => {
});
});

it('should not trigger when subscription is thrown', async () => {
it('should terminate when subscription is thrown', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));
Expand Down Expand Up @@ -1050,15 +1050,14 @@ describe('Subscription Publish Phase', () => {

payload = subscription.next();

// Throw error
let caughtError;
try {
/* c8 ignore next 2 */
await subscription.throw('ouch');
} catch (e) {
caughtError = e;
}
expect(caughtError).to.equal('ouch');
const thrown = subscription.throw('ouch');

expectJSON(await thrown).toDeepEqual({
done: false,
value: {
errors: [{ message: 'Unexpected error value: "ouch"' }],
},
});

expect(await payload).to.deep.equal({
done: true,
Expand Down Expand Up @@ -1230,7 +1229,16 @@ describe('Subscription Publish Phase', () => {
},
});

await expectPromise(subscription.next()).toRejectWith('test error');
expectJSON(await subscription.next()).toDeepEqual({
done: false,
value: {
errors: [
{
message: 'test error',
},
],
},
});

expect(await subscription.next()).to.deep.equal({
done: true,
Expand Down
1 change: 1 addition & 0 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,7 @@ function mapSourceToResponse(
};
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
},
(error) => ({ errors: [locatedError(error, undefined)] }),
() => abortSignalListener?.disconnect(),
);
}
Expand Down
20 changes: 14 additions & 6 deletions src/execution/mapAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';

/**
* Given an AsyncIterable and a callback function, return an AsyncIterator
* which produces values mapped via calling the callback function.
* Given an AsyncIterable and a onValue function, return an AsyncIterator
* which produces values mapped via calling the onValue function.
*/
export function mapAsyncIterable<T, U, R = undefined>(
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
callback: (value: T) => PromiseOrValue<U>,
onValue: (value: T) => PromiseOrValue<U>,
onError: (error: any) => PromiseOrValue<U> = (error: any) => {
throw error;
},
onDone?: (() => void) | undefined,
): AsyncGenerator<U, R, void> {
const iterator = iterable[Symbol.asyncIterator]();

let errored = false;

async function mapResult(
promise: Promise<IteratorResult<T, R>>,
): Promise<IteratorResult<U, R>> {
Expand All @@ -23,12 +28,13 @@ export function mapAsyncIterable<T, U, R = undefined>(
}
value = result.value;
} catch (error) {
errored = true;
onDone?.();
throw error;
return { value: await onError(error), done: false };
}

try {
return { value: await callback(value), done: false };
return { value: await onValue(value), done: false };
} catch (error) {
/* c8 ignore start */
// FIXME: add test case
Expand All @@ -46,7 +52,9 @@ export function mapAsyncIterable<T, U, R = undefined>(

return {
async next() {
return mapResult(iterator.next());
return errored
? Promise.resolve({ value: undefined as any, done: true })
: mapResult(iterator.next());
},
async return(): Promise<IteratorResult<U, R>> {
// If iterator.return() does not exist, then type R must be undefined.
Expand Down

0 comments on commit f7e151d

Please sign in to comment.