Haven't found anything to execute Tasks in parallel with a limit. Cause I don't want to switch to fluture I share my parallel implementation with you. Feedback is welcome.
@gcanti would this be something for fp-ts?
// code compatible with [email protected]
import { delay, Task } from 'fp-ts/lib/Task';
export const parallel = <A>(tasks: Task<A>[], limit: number) =>
new Task<A[]>(() => new Promise((resolve, reject) => {
const out: A[] = [];
let running = 0;
let failed = false;
const onResolve = (index: number) => (result: A) => {
out[index] = result;
running--;
poll();
};
const onReject = (err: any) => {
failed = true;
reject(err);
};
const onFinish = () => {
if (running > 0) return;
resolve(out);
};
const jobs = tasks.map((task, i) => () => task.run().then(onResolve(i), onReject));
const poll = () => {
if (running >= limit || failed) return;
const job = jobs.shift();
if (!job) return onFinish();
running++;
job();
poll();
};
poll();
}));
parallel([
delay(0, 'task1').map(console.log),
delay(500, 'task2').map(console.log),
delay(250, 'task3').map(console.log),
delay(100, 'task4').map(console.log)
], 2).run();
parallel([
delay(200, 'task3').chain(() => new Task<string>(() => { throw new Error('stop'); })),
delay(0, 'task1'),
delay(100, 'task2')
], 2).run().catch((err) => console.error(err));
@mlegenhausen thanks for sharing. Looks like a "taskified" version of p-map
// code compatible with [email protected]
import pmap from 'p-map'
const parallel = <A>(tasks: Task<A>[], limit: number): Task<A[]> => {
return new Task(() => pmap(tasks, t => t.run(), { concurrency: limit }))
}
Another combinator
// code compatible with [email protected]
import { array, chunksOf } from '../src/Array'
import { HKT, Type, Type2, URIS, URIS2 } from '../src/HKT'
import { Monad, Monad1, Monad2 } from '../src/Monad'
import { sequence } from '../src/Traversable'
/**
* Like `sequence` but actions are batched in chunks.
* Hint: you can use `Array.chunksOf` to provide the `mas` argument
*/
export function batchSequence<M extends URIS2>(
M: Monad2<M>
): <L, A>(mas: Array<Array<Type2<M, L, A>>>) => Type2<M, L, Array<A>>
export function batchSequence<M extends URIS>(M: Monad1<M>): <A>(mas: Array<Array<Type<M, A>>>) => Type<M, Array<A>>
export function batchSequence<M>(M: Monad<M>): <A>(mas: Array<Array<HKT<M, A>>>) => HKT<M, Array<A>> {
const sequenceM = sequence(M, array)
return <A>(mas: Array<Array<HKT<M, A>>>) =>
mas.reduce(
(b: HKT<M, Array<A>>, chunk: Array<HKT<M, A>>) => M.chain(b, xs => M.map(sequenceM(chunk), ys => xs.concat(ys))),
M.of([])
)
}
import { TaskEither, taskEither } from '../src/TaskEither'
declare const task1: TaskEither<string, number>
declare const task2: TaskEither<string, number>
declare const task3: TaskEither<string, number>
declare const task4: TaskEither<string, number>
declare const task5: TaskEither<string, number>
batchSequence(taskEither)(chunksOf([task1, task2, task3, task4, task5], 2))
@gcanti @mlegenhausen some of the snippets here could probably be generalized and become part of fp-ts-contrib?
@giogonzo (the batch combinator) https://github.com/gcanti/fp-ts-contrib/pull/4
Update for [email protected]
// code compatible with [email protected]
import { Task } from 'fp-ts/lib/Task'
import pmap from 'p-map'
export function parallel<A>(tasks: Array<Task<A>>, limit: number): Task<Array<A>> {
return () => pmap(tasks, t => t(), { concurrency: limit })
}
Most helpful comment
@mlegenhausen thanks for sharing. Looks like a "taskified" version of p-map