How to create streams that deliver more data after an error event?
According to dart documentation, the stream should be able to emit more data after emits an error (add error will not close the stream).
Streams can also deliver error events like it delivers data events. Most streams will stop after the first error, but it is possible to have streams that deliver more than one error, and streams that deliver more data after an error event.
If a stream needs to be transferred and the transformation may throw an error, is it possible to make the transformed stream not be closed by add error?
stream.transform(transformer);
//or
stream.map(transformation);
That's the default behavior of streams. If you use a stream controller to create the stream, you can keep adding events after adding an error. So, that's one approach:
Stream<Foo> foos() {
var c = StreamController<Foo>();
async() {
compute... events... await something...
c.add(event)
...
c.addError(error, stack);
...
await other
}();
return c.stream;
}
Transforming a stream can also preserve both errors and events, as long as the transformer itself can handle and emit errors.
However, the async and async* method syntax is not built for handling more than one error.
An await for loop, like Stream.forEach, will exit after the first error, and a yield cannot emit an error, so most errors from async* streams are emitted by throwing and exiting the function, which automatically makes the error the last event of the stream. I'm guessing the latter is your problem.
For the yield, there is a workaround: yield* Stream.error(error, stackTrace); will emit an error on the stream without throwing, which allows you to emit more events afterwards. If your transformation can throw, you then need to catch that error, yield the error on the stream as shown here, and then continue the computation.
For await for, you do need to encapsulate the error. You can use Result.captureStream to convert a Stream<T> to a Stream<Result<T>> where the Results will contain both values and errors.
Many thx, @lrhn
I finally realized the stream is closed by stream.onErrorReturnWith() (Rxdart plugin), not streamController.addError() or sink.addError().
For StreamTransformer<S, T> constructor, the documentation says:
If the transformed stream is listened to multiple times, the onListen callback is called again for each new Stream.listen call. This happens whether the stream is a broadcast stream or not, but the call will usually fail for non-broadcast streams.
For StreamTransformer.fromHandlers(), the documentation says:
Transformers that are constructed this way cannot use captured state if they are used in streams that can be listened to multiple times.
So, basically StreamTransformer should not involve any shared state calculation, and these calculations should be performed by the source/original stream?
Most helpful comment
That's the default behavior of streams. If you use a stream controller to create the stream, you can keep adding events after adding an error. So, that's one approach:
Transforming a stream can also preserve both errors and events, as long as the transformer itself can handle and emit errors.
However, the
asyncandasync*method syntax is not built for handling more than one error.An
await forloop, likeStream.forEach, will exit after the first error, and ayieldcannot emit an error, so most errors fromasync*streams are emitted by throwing and exiting the function, which automatically makes the error the last event of the stream. I'm guessing the latter is your problem.For the
yield, there is a workaround:yield* Stream.error(error, stackTrace);will emit an error on the stream without throwing, which allows you to emit more events afterwards. If your transformation can throw, you then need to catch that error, yield the error on the stream as shown here, and then continue the computation.For
await for, you do need to encapsulate the error. You can useResult.captureStreamto convert aStream<T>to aStream<Result<T>>where theResults will contain both values and errors.