Just wondering why such code:
items = Repo.stream(Something)
stream = Task.Supervisor.async_stream_nolink(TaskSupervisor, items, fn item ->
do_some_io_with(item)
end)
Repo.transaction(fn -> Stream.run(stream) end)
blocks caller?
Stream.run(stream) is assuming that I don't care about results so there is no need to wait while all tasks will be finished. So why it act the same as Enum.to_list(stream)? Is there some way to do it without blocking except starting a separate process?
Yes, Stream.run always blocks the caller. The difference to Enum.to_list is that it doesn't have to build a list because you don't care about results. Stream is always about laziness, it never starts processes for you. In your case, you can start a separate process yourself.
Just an example, if Stream.run started another process, then your code sample wouldn't work, because the new process wouldn't be inside the transaction (which is per process).
@josevalim thank you,
Stream is always about laziness
Yes, but async_stream about asynchronous processing, so I hoped to have it advantages. But it looks like it will always wait for results even if I don't care about them.
you can look at GenStage and Flow.
@josevalim As I know they don't work well with Repo.stream
https://github.com/elixir-lang/gen_stage/issues/150
https://elixirforum.com/t/repo-stream-flow-from-enumerable/16477/3
Does this still relevant?
We will support this likely on Ecto 3.1
Ah, yes, you are correct. So you will have to start the process yourself for now but, as I said, if we started the process automatically, which is what Flow/GenStage does, then it wouldn't work with Ecto either. :D
The feature you request was not yet implemented, so Ecto 3.1 it is indeed.
Most helpful comment
Yes, Stream.run always blocks the caller. The difference to Enum.to_list is that it doesn't have to build a list because you don't care about results. Stream is always about laziness, it never starts processes for you. In your case, you can start a separate process yourself.
Just an example, if Stream.run started another process, then your code sample wouldn't work, because the new process wouldn't be inside the transaction (which is per process).