Following up on @jdegoes's comment on gitter, this family of combinators should adapt callback-based side-effecting APIs to streams. Instead of this familiar pattern:
ZStream.unwrap {
for {
output <- Queue.bounded[Take[A]](64)
runtime <- ZIO.runtime[Any]
_ <- Task {
sideEffectingApi.register(new Callback {
def onData(a: A) = runtime.unsafeRun(output.offer(Take.Value(a)))
def onError(t: Throwable) = runtime.unsafeRun(output.offer(Take.Fail(t)))
})
}
} yield ZStream.fromQueue(output).unTake
}
We should be able to do:
ZStream.effectAsync { cb: (Task[A] => Unit) =>
sideEffectingApi.register(new Callback {
def onData(a: A) = cb(Task.succeed(a))
def onError(t: Throwable) = cb(Task.fail(t))
})
}
Would be interesting to see if we can translate all of the variants of ZIO.effectAsync*.
I would like to give this a go :-)
@dariusrobson please do! Let me know if you need any help
@iravid if you don't mind, can I take a look?