Skip to content

Commit

Permalink
Dispose of subscription on cleanup even while waiting for next event (#…
Browse files Browse the repository at this point in the history
…6794)

* failing test

* dispose on cleanup and better handle

* changeset
  • Loading branch information
enisdenjo authored Dec 24, 2024
1 parent 8e15907 commit 0ce1bd2
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/quick-swans-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-tools/executor-apollo-link': patch
---

Dispose of subscription on cleanup even while waiting for next event
68 changes: 34 additions & 34 deletions packages/executors/apollo-link/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
import * as apolloImport from '@apollo/client';
import { Executor, fakePromise, isAsyncIterable } from '@graphql-tools/utils';
import { ExecutionRequest, Executor, isAsyncIterable } from '@graphql-tools/utils';

const apollo: typeof apolloImport = (apolloImport as any)?.default ?? apolloImport;

function createApolloRequestHandler(executor: Executor): apolloImport.RequestHandler {
return function ApolloRequestHandler(
operation: apolloImport.Operation,
): apolloImport.Observable<apolloImport.FetchResult> {
return function ApolloRequestHandler(operation) {
return new apollo.Observable(observer => {
fakePromise()
.then(() =>
executor({
document: operation.query,
variables: operation.variables,
operationName: operation.operationName,
extensions: operation.extensions,
context: operation.getContext(),
}),
)
.then(results => {
if (isAsyncIterable(results)) {
return fakePromise().then(async () => {
for await (const result of results) {
if (observer.closed) {
return;
}
observer.next(result);
}
observer.complete();
});
} else if (!observer.closed) {
observer.next(results);
observer.complete();
}
})
.catch(e => {
if (!observer.closed) {
observer.error(e);
const executionRequest: ExecutionRequest = {
document: operation.query,
variables: operation.variables,
operationName: operation.operationName,
extensions: operation.extensions,
context: operation.getContext(),
};

let disposed = false;
let dispose = () => {
disposed = true;
};
(async function execution() {
const results = await executor(executionRequest);

// request couldve been disposed before getting results
if (disposed) return;

if (isAsyncIterable(results)) {
dispose = () => {
results[Symbol.asyncIterator]().return?.();
};
for await (const result of results) {
observer.next(result);
}
});
} else {
observer.next(results);
}
})()
.then(() => observer.complete())
.catch(e => observer.error(e));

return () => dispose();
});
};
}
Expand Down
28 changes: 27 additions & 1 deletion packages/executors/apollo-link/tests/apollo-link.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { setTimeout } from 'timers/promises';
import { parse } from 'graphql';
import { createSchema, createYoga } from 'graphql-yoga';
import { createSchema, createYoga, Repeater } from 'graphql-yoga';
import { ApolloClient, FetchResult, InMemoryCache } from '@apollo/client/core';
import { buildHTTPExecutor } from '@graphql-tools/executor-http';
import { createDeferred } from '@graphql-tools/utils';
import { testIf } from '../../../testing/utils.js';
import { ExecutorLink } from '../src/index.js';

describe('Apollo Link', () => {
const { promise: waitForPingStop, resolve: pingStop } = createDeferred<void>();
const yoga = createYoga({
logging: false,
maskedErrors: false,
Expand All @@ -21,6 +23,7 @@ describe('Apollo Link', () => {
}
type Subscription {
time: String
ping: String
}
`,
resolvers: {
Expand All @@ -40,6 +43,13 @@ describe('Apollo Link', () => {
},
resolve: str => str,
},
ping: {
subscribe: () =>
new Repeater(async (_pull, stop) => {
await stop;
pingStop();
}),
},
},
},
}),
Expand Down Expand Up @@ -121,4 +131,20 @@ describe('Apollo Link', () => {
readFile: 'Hello World',
});
});
it('should complete subscription even while waiting for events', async () => {
const observable = client.subscribe({
query: parse(/* GraphQL */ `
subscription Ping {
ping
}
`),
});
const sub = observable.subscribe({
next: () => {
// noop
},
});
globalThis.setTimeout(() => sub.unsubscribe(), 0);
await waitForPingStop;
});
});

0 comments on commit 0ce1bd2

Please sign in to comment.