Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewriting retry operator #246

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ iterable is. This makes it difficult to test objectively. On one hand, `iter-ops
has embedded optimization for wrapping an asynchronous iterable, so if we test that against a standard async iterable
for `rxjs`, we get the result as above:

- Testing against `rxjs` asynchronous pipeline, we went from ~7x better performance (in RxJs v6) to ~0.5x performance
against the latest RxJs v7.
- Testing against `rxjs` with a single empty subscription - we went from ~15x better performance (in RxJs v6), to just ~2x
better performance against the latest RxJs v7
- Testing against `rxjs` asynchronous pipeline, we went from ~7x better performance (in RxJs v6) to ~0.5x performance
against the latest RxJs v7.
- Testing against `rxjs` with a single empty subscription - we went from ~15x better performance (in RxJs v6), to just ~2x
better performance against the latest RxJs v7

The above points at tremendous performance optimizations done in v7 of RxJs.
2 changes: 1 addition & 1 deletion benchmarks/src/async.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {testIterOps} from './tests/iter-ops';
import {testRXJS} from './tests/rxjs';
import {toAsync} from '../../';
import {toAsync} from '../../src';

// tslint:disable:no-console

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tests/iter-ops.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {type UnknownIterable, filter, map, pipe, toArray} from '../../../';
import {type UnknownIterable, filter, map, pipe, toArray} from '../../../src';

export async function testIterOps(input: UnknownIterable<number>) {
const start = Date.now();
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tests/rxjs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type {UnknownIterable} from '../../../';
import type {UnknownIterable} from '../../../src';
import {filter, firstValueFrom, from, map, toArray} from 'rxjs';

export async function testRXJS(
Expand Down
63 changes: 38 additions & 25 deletions src/ops/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,20 @@ export function retry<T>(attempts: number): Operation<T, T>;
* throw new Error(`fail-${value}`); // throw for all even numbers
* }
* }),
* retry((i, a, s) => {
* // i = index, a = attempts, s = state
* return a < 3; // make up to 3 retry attempts
* retry(({attempt}) => {
* // available properties: {attempt, error, index, state}
* return attempt < 3; // make up to 3 retry attempts
* })
* );
*
* console.log(...i); //=> 1, 3, 5, 7, 9
* ```
*
* The callback is only invoked when there is a failure, and it receives:
* The callback is only invoked when there is a failure, and it receives an object with the following properties:
*
* - `attempt` - attempt index so far (starts with 0)
* - `error` - the error that was thrown
* - `index` - index of the iterable value that threw/rejected
* - `attempts` - number of retry attempts made so far (starts with 0)
* - `state` - state for the entire iteration session
*
* Note that retries deplete values prior the operator that threw the error,
Expand All @@ -70,11 +72,12 @@ export function retry<T>(attempts: number): Operation<T, T>;
* @category Sync+Async
*/
export function retry<T>(
cb: (
index: number,
attempts: number,
state: IterationState
) => boolean | Promise<boolean>
retry: (cb: {
attempt: number;
error: any;
index: number;
state: IterationState;
}) => boolean | Promise<boolean>
): Operation<T, T>;

export function retry(...args: unknown[]) {
Expand All @@ -83,7 +86,12 @@ export function retry(...args: unknown[]) {

type Retry<T> =
| number
| ((index: number, attempts: number, state: IterationState) => T);
| ((cb: {
attempt: number;
error: any;
index: number;
state: IterationState;
}) => T);

function retrySync<T>(
iterable: Iterable<T>,
Expand All @@ -99,7 +107,7 @@ function retrySync<T>(
const state: IterationState = {};
let index = 0;
const cb = typeof retry === 'function' && retry;
let attempts = 0;
let attempt = 0;
const retriesNumber = !cb && retry > 0 ? retry : 0;
let leftTries = retriesNumber;
return {
Expand All @@ -108,13 +116,15 @@ function retrySync<T>(
try {
const a = i.next();
index++;
attempts = 0;
attempt = 0;
leftTries = retriesNumber;
return a;
} catch (err) {
const r = cb && cb(index++, attempts++, state);
} catch (error) {
const r = cb && cb({attempt, index, error, state});
attempt++;
index++;
if (!r && !leftTries--) {
throw err;
throw error;
}
}
} while (true);
Expand All @@ -138,38 +148,41 @@ function retryAsync<T>(
const state: IterationState = {};
let index = 0;
const cb = typeof retry === 'function' && retry;
let attempts = 0;
let attempt = 0;
const retriesNumber = !cb && retry > 0 ? retry : 0;
let leftTries = retriesNumber;
return {
next(): Promise<IteratorResult<T>> {
return i.next().then(
(a) => {
index++;
attempts = 0;
attempt = 0;
leftTries = retriesNumber;
return a;
},
(e) => {
(error) => {
if (cb) {
const b = (f: any) =>
f
? this.next()
: // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
Promise.reject(e);
const r = cb(
index++,
attempts++,
Promise.reject(error);
const r = cb({
attempt,
index,
error,
state
) as Promise<boolean>;
}) as Promise<boolean>;
attempt++;
index++;
return isPromiseLike(r) ? r.then(b) : b(r);
}
if (leftTries) {
leftTries--;
return this.next();
}
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
return Promise.reject(e);
return Promise.reject(error);
}
);
}
Expand Down
24 changes: 12 additions & 12 deletions test/ops/retry/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ export default () => {
it('must retry while resolves with true', async () => {
const i = pipeAsync(
source,
retry((index, attempts) => Promise.resolve(attempts < 3))
retry(({attempt}) => Promise.resolve(attempt < 3))
);
expect(await _asyncValues(i)).to.eql([3, 4, 5]);
});
it('must retry while returns true', async () => {
const i = pipeAsync(
source,
retry((index, attempts) => attempts < 3)
retry(({attempt}) => attempt < 3)
);
expect(await _asyncValues(i)).to.eql([3, 4, 5]);
});
Expand All @@ -42,7 +42,7 @@ export default () => {
it('must throw when failed for Promise', async () => {
const i = pipeAsync(
source,
retry((index, attempts) => Promise.resolve(attempts < 1))
retry(({attempt}) => Promise.resolve(attempt < 1))
);
let err: any;
try {
Expand All @@ -55,7 +55,7 @@ export default () => {
it('must throw when failed for boolean', async () => {
const i = pipeAsync(
source,
retry((index, attempts) => attempts < 1)
retry(({attempt}) => attempt < 1)
);
let err: any;
try {
Expand All @@ -66,23 +66,23 @@ export default () => {
expect(err?.message).to.eql('Throw for value 2');
});
it('must pass correct callback parameters', async () => {
const indexes: number[] = [],
attempts: number[] = [];
const ind: number[] = [],
att: number[] = [];
const i = pipeAsync(
[11, 20, 33, 40, 55, 60, 77, 80, 99],
tap((value) => {
if (value % 2 === 0) {
throw new Error(`fail-${value}`); // throw for all even numbers
}
}),
retry((idx, att) => {
indexes.push(idx);
attempts.push(att);
return att < 1; // retry once
retry(({attempt, index}) => {
att.push(attempt);
ind.push(index);
return attempt < 1; // retry once
})
);
expect(await _asyncValues(i)).to.eql([11, 33, 55, 77, 99]);
expect(indexes).to.eql([1, 3, 5, 7]);
expect(attempts).to.eql([0, 0, 0, 0]);
expect(ind).to.eql([1, 3, 5, 7]);
expect(att).to.eql([0, 0, 0, 0]);
});
};
16 changes: 8 additions & 8 deletions test/ops/retry/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ export default () => {
expect([...i]).to.eql([4, 5]);
});
it('must retry on callback result', () => {
const indexes: number[] = [],
attempts: number[] = [];
const ind: number[] = [],
att: number[] = [];
const i = pipe(
[11, 20, 33, 40, 55, 60, 77, 80, 99],
tap((value) => {
if (value % 2 === 0) {
throw new Error(`fail-${value}`); // throw for all even numbers
}
}),
retry((idx, att) => {
indexes.push(idx);
attempts.push(att);
return att < 1; // retry once
retry(({attempt, index}) => {
att.push(attempt);
ind.push(index);
return attempt < 1; // retry once
})
);
expect([...i]).to.eql([11, 33, 55, 77, 99]);
expect(indexes).to.eql([1, 3, 5, 7]);
expect(attempts).to.eql([0, 0, 0, 0]);
expect(ind).to.eql([1, 3, 5, 7]);
expect(att).to.eql([0, 0, 0, 0]);
});
};
6 changes: 3 additions & 3 deletions yarn.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading