Fp-ts: Naive Task parallel implementation

Created on 26 Sep 2018  路  5Comments  路  Source: gcanti/fp-ts

Haven't found anything to execute Tasks in parallel with a limit. Cause I don't want to switch to fluture I share my parallel implementation with you. Feedback is welcome.

@gcanti would this be something for fp-ts?

// code compatible with [email protected]
import { delay, Task } from 'fp-ts/lib/Task';

export const parallel = <A>(tasks: Task<A>[], limit: number) =>
  new Task<A[]>(() => new Promise((resolve, reject) => {
    const out: A[] = [];
    let running = 0;
    let failed = false;

    const onResolve = (index: number) => (result: A) => {
      out[index] = result;
      running--;
      poll();
    };

    const onReject = (err: any) => {
      failed = true;
      reject(err);
    };

    const onFinish = () => {
      if (running > 0) return;
      resolve(out);
    };

    const jobs = tasks.map((task, i) => () => task.run().then(onResolve(i), onReject));

    const poll = () => {
      if (running >= limit || failed) return;

      const job = jobs.shift();
      if (!job) return onFinish();

      running++;
      job();

      poll();
    };

    poll();
  }));

parallel([
  delay(0, 'task1').map(console.log),
  delay(500, 'task2').map(console.log),
  delay(250, 'task3').map(console.log),
  delay(100, 'task4').map(console.log)
], 2).run();

parallel([
  delay(200, 'task3').chain(() => new Task<string>(() => { throw new Error('stop'); })),
  delay(0, 'task1'),
  delay(100, 'task2')
], 2).run().catch((err) => console.error(err));
suggestion

Most helpful comment

@mlegenhausen thanks for sharing. Looks like a "taskified" version of p-map

// code compatible with [email protected]
import pmap from 'p-map'

const parallel = <A>(tasks: Task<A>[], limit: number): Task<A[]> => {
  return new Task(() => pmap(tasks, t => t.run(), { concurrency: limit }))
}

All 5 comments

@mlegenhausen thanks for sharing. Looks like a "taskified" version of p-map

// code compatible with [email protected]
import pmap from 'p-map'

const parallel = <A>(tasks: Task<A>[], limit: number): Task<A[]> => {
  return new Task(() => pmap(tasks, t => t.run(), { concurrency: limit }))
}

Another combinator

// code compatible with [email protected]
import { array, chunksOf } from '../src/Array'
import { HKT, Type, Type2, URIS, URIS2 } from '../src/HKT'
import { Monad, Monad1, Monad2 } from '../src/Monad'
import { sequence } from '../src/Traversable'

/**
 * Like `sequence` but actions are batched in chunks.
 * Hint: you can use `Array.chunksOf` to provide the `mas` argument
 */
export function batchSequence<M extends URIS2>(
  M: Monad2<M>
): <L, A>(mas: Array<Array<Type2<M, L, A>>>) => Type2<M, L, Array<A>>
export function batchSequence<M extends URIS>(M: Monad1<M>): <A>(mas: Array<Array<Type<M, A>>>) => Type<M, Array<A>>
export function batchSequence<M>(M: Monad<M>): <A>(mas: Array<Array<HKT<M, A>>>) => HKT<M, Array<A>> {
  const sequenceM = sequence(M, array)
  return <A>(mas: Array<Array<HKT<M, A>>>) =>
    mas.reduce(
      (b: HKT<M, Array<A>>, chunk: Array<HKT<M, A>>) => M.chain(b, xs => M.map(sequenceM(chunk), ys => xs.concat(ys))),
      M.of([])
    )
}

import { TaskEither, taskEither } from '../src/TaskEither'

declare const task1: TaskEither<string, number>
declare const task2: TaskEither<string, number>
declare const task3: TaskEither<string, number>
declare const task4: TaskEither<string, number>
declare const task5: TaskEither<string, number>
batchSequence(taskEither)(chunksOf([task1, task2, task3, task4, task5], 2))

@gcanti @mlegenhausen some of the snippets here could probably be generalized and become part of fp-ts-contrib?

@giogonzo (the batch combinator) https://github.com/gcanti/fp-ts-contrib/pull/4

Update for [email protected]

// code compatible with [email protected]
import { Task } from 'fp-ts/lib/Task'
import pmap from 'p-map'

export function parallel<A>(tasks: Array<Task<A>>, limit: number): Task<Array<A>> {
  return () => pmap(tasks, t => t(), { concurrency: limit })
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

steida picture steida  路  4Comments

mohsensaremi picture mohsensaremi  路  3Comments

vicrac picture vicrac  路  4Comments

mmkal picture mmkal  路  3Comments

mattgrande picture mattgrande  路  3Comments