With GenStage.Flow more involved, it is clear one this pattern won't be covered by GenStage.Flow:
|> Enum.map(fn x -> Task.async(fn -> do_something(x) end) end)
|> Enum.map(&Task.await(&1, 5000))
We should consider including such in the standard library. I can think of two approaches:
collection |> Enum.pmap(&do_something/1, 5000)collection |> Task.async_many(&do_something/1) |> Task.await_many(5000)I am more inclined towards the second. It mirrors yield_many nicely and will also give us more control when starting tasks, reducing work. We will also be able to add Task.Supervisor.async_many and Task.Supervisor.async_no_link_many.
PS: Note this is a discussion. If a PR is sent for now, there is no guarantee it will be merged.
The problem I see with this approach is that it may actually be slower in many cases, than a simple Enum.map/2. We need a really good documentation to tell people they need to make sure they provide enough work for each process, or an option to Enum.pmap/3 to chunk the data before.
I'm just afraid people will simply change Enum.map/2 to Enum.pmap/2 see it's slower, and be very disappointed.
Agreed with @michalmuskala, this is a "dangerous" feature to implement right because you really have to know what you're doing, and you have to know a bunch of stuff like how to handle errors in spawned processes (they're linked I guess?) and so on.
I just want to point out that all of those arguments could (and were) used against Task. "It could be used wrong" is definitely a given but I wouldn't like to limit everyone using it correctly because we are afraid people will use it without properly measuring or reading the docs.
Sorry, I didn't mean to discourage this feature, I like the idea :) I meant that we should have a good balance of configurability (because a bunch of things could go wrong and we want to provide ways to not make them go wrong) and "do the right thing"鈩ness. Also, if we provide Enum.pmap/2, would we provide something like Enum.peach/2 as well? Asking because the "do these things in parallel but who cares about the result" pattern could be common. I give for granted that we're gonna provide Stream.pmap/2 as well, right? :) So many questions!
Enum.pmap/2, would we provide something like Enum.peach/2
That's why I prefer the Task approach because it is more explicit in terms of semantics. Since it is async_something, it follows the same semantics as Task.async in terms of linking and in terms of calling Task.await_many being required.
I give for granted that we're gonna provide Stream.pmap/2
At least this one does not make sense. Stream computes item by item, which means you can't leverage parallelism on Stream.pmap/2. If you try to do that, you will end up with something like GenStage.Flow. :)
Your reply makes me think the best way to go is 2 indeed.
Ah, I see now with Stream.pmap/2, sorry, didn't think it through. Yes from my perspective 2. looks more formal/strict and appears to leave less room to mistakes :)
I'll throw in a 馃憤 for option 2. I take it that it would retain the order of the collection unlike Task.yield_many. If so, would there be room for a function that doesn't care about retaining the order or is that use case covered by Task.yield_many?
Probably a case of YAGNI.
Seems kind of a slippery slope, you know the first request is going to be for Task.chunk
I think if you go into it knowing that managing a worker pool is out of scope, the Task.async_many is fine. I would avoid Enum.pmap at all costs. Everyone should write that in the first week so they learn why it's not a great idea.
Please see #5367. We have decided to go with something that is based on GenStage.Flow.map and allows a streaming-bounded set of tasks to be computed. It is the more robust implementation of everything proposed. We have decided to call it Task.pmap because we will also add Task.Supervisor.pmap variant, which spawns supervised tasks. The rationale for going this way was after observing the usage and benefits of Flow. It is quite different from the Enum.pmap everyone writes (since the number of tasks started is bounded). Closing this in favor of the PR.
Most helpful comment
That's why I prefer the
Taskapproach because it is more explicit in terms of semantics. Since it isasync_something, it follows the same semantics asTask.asyncin terms of linking and in terms of callingTask.await_manybeing required.At least this one does not make sense. Stream computes item by item, which means you can't leverage parallelism on
Stream.pmap/2. If you try to do that, you will end up with something likeGenStage.Flow. :)Your reply makes me think the best way to go is 2 indeed.