Zio: Add ZStream#effectAsync*

Created on 20 Jun 2019  路  3Comments  路  Source: zio/zio

Following up on @jdegoes's comment on gitter, this family of combinators should adapt callback-based side-effecting APIs to streams. Instead of this familiar pattern:

ZStream.unwrap {
  for {
    output <- Queue.bounded[Take[A]](64)
    runtime <- ZIO.runtime[Any]
    _ <- Task {
      sideEffectingApi.register(new Callback {
        def onData(a: A) = runtime.unsafeRun(output.offer(Take.Value(a)))
        def onError(t: Throwable) = runtime.unsafeRun(output.offer(Take.Fail(t)))
      })
    }
  } yield ZStream.fromQueue(output).unTake
}

We should be able to do:

ZStream.effectAsync { cb: (Task[A] => Unit) =>
  sideEffectingApi.register(new Callback {
    def onData(a: A) = cb(Task.succeed(a))
    def onError(t: Throwable) = cb(Task.fail(t))
  })
}

Would be interesting to see if we can translate all of the variants of ZIO.effectAsync*.

good first issue help wanted stream

All 3 comments

I would like to give this a go :-)

@dariusrobson please do! Let me know if you need any help

@iravid if you don't mind, can I take a look?

Was this page helpful?
0 / 5 - 0 ratings