Elixir: async_stream + Stream.run is blocking

Created on 1 Dec 2018  路  5Comments  路  Source: elixir-lang/elixir

Just wondering why such code:

items = Repo.stream(Something)

stream = Task.Supervisor.async_stream_nolink(TaskSupervisor, items, fn item -> 
  do_some_io_with(item)
end)

Repo.transaction(fn -> Stream.run(stream) end)

blocks caller?

Stream.run(stream) is assuming that I don't care about results so there is no need to wait while all tasks will be finished. So why it act the same as Enum.to_list(stream)? Is there some way to do it without blocking except starting a separate process?

Most helpful comment

Yes, Stream.run always blocks the caller. The difference to Enum.to_list is that it doesn't have to build a list because you don't care about results. Stream is always about laziness, it never starts processes for you. In your case, you can start a separate process yourself.

Just an example, if Stream.run started another process, then your code sample wouldn't work, because the new process wouldn't be inside the transaction (which is per process).

All 5 comments

Yes, Stream.run always blocks the caller. The difference to Enum.to_list is that it doesn't have to build a list because you don't care about results. Stream is always about laziness, it never starts processes for you. In your case, you can start a separate process yourself.

Just an example, if Stream.run started another process, then your code sample wouldn't work, because the new process wouldn't be inside the transaction (which is per process).

@josevalim thank you,

Stream is always about laziness

Yes, but async_stream about asynchronous processing, so I hoped to have it advantages. But it looks like it will always wait for results even if I don't care about them.

Ah, I see. If you want it to fully run as a separate tree of processes, then you can look at GenStage and Flow.

you can look at GenStage and Flow.

@josevalim As I know they don't work well with Repo.stream

https://github.com/elixir-lang/gen_stage/issues/150
https://elixirforum.com/t/repo-stream-flow-from-enumerable/16477/3

Does this still relevant?

We will support this likely on Ecto 3.1

Ah, yes, you are correct. So you will have to start the process yourself for now but, as I said, if we started the process automatically, which is what Flow/GenStage does, then it wouldn't work with Ecto either. :D

The feature you request was not yet implemented, so Ecto 3.1 it is indeed.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

whitepaperclip picture whitepaperclip  路  3Comments

LucianaMarques picture LucianaMarques  路  3Comments

lukaszsamson picture lukaszsamson  路  3Comments

ericmj picture ericmj  路  3Comments

alexrp picture alexrp  路  4Comments