Short description of the issue:
.share(replay: 1, scope: .forever) behaves differently if it is applied to a stream that completes and that does not complete in case the stream is disposed.
Expected outcome:
The behaviour is consistent, stale values are replayed even if a stream is disposed and it does not matter if it completes or not.
It is also what is stated in the documentation from what I understand:
What actually happens:
There are two cases:
Self contained code example that reproduces the issue:
The first case:
let xs: Observable<Double> = Observable.create { observer in
print("Performing work ...")
observer.onNext(Date().timeIntervalSince1970)
observer.onCompleted()
return Disposables.create()
}
.share(replay: 1, scope: .forever)
xs.subscribe(onNext: { print("first: next \($0)") }).dispose()
xs.subscribe(onNext: { print("second: next \($0)") })}
The second case (only observer.onCompleted() was removed):
let xs: Observable<Double> = Observable.create { observer in
print("Performing work ...")
observer.onNext(Date().timeIntervalSince1970)
return Disposables.create()
}
.share(replay: 1, scope: .forever)
xs.subscribe(onNext: { print("first: next \($0)") }).dispose()
xs.subscribe(onNext: { print("second: next \($0)") })}
RxSwift
4.1.2
Platform/Environment
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
Xcode version:
Xcode 9.3
Installation method:
I have multiple versions of Xcode installed:
Level of RxSwift knowledge:
Hi @TomasLinhart ,
This looks interesting, but I'm not sure this is a bug.
Could you please check how RxJava and RxJs behave? I think they should behave similar.
.share(replay: 1, scope: .forever) is unique to RxSwift so there is no such thing in RxJS or RxJava.
If you use the same underlying implementation in RxJS (using multicast) it behaves same as in RxSwift: https://jsfiddle.net/g5pj065m/3204/
In RxJava there is no multicast, so it is not possible to use the same implementation. But they have different means to achieve same results. There is a .cache() operator. This is very similar to .share(replay: 1, scope: .forever) but it replays all values.
val o1 = Observable.create<Int> {
it.onNext(Random().nextInt(100))
}.cache()
o1.subscribe({ println(it) }).dispose()
o1.subscribe({ println(it) })
This behaves correctly as expected. It replays correctly after disposing.
It is also possible to a use combination of .replay(1) and .autoConnect().
val o1 = Observable.create<Int> {
it.onNext(Random().nextInt(100))
}.replay(1).autoConnect()
o1.subscribe({ println(it) }).dispose()
o1.subscribe({ println(it) })
This also behaves correctly as expected. It replays correctly after disposing.
There is an extension to RxJava called RxReplayingShare that behaves somehow similar to RxSwift approach, but it fetches value again if the original stream completed, so it is somehow more consistent than RxSwift approach.
I understand, it might be not necessarily a bug but nevertheless it is confusing and not very intuitive and as you can see it behaves more intuitively in RxJava,
Oh, sorry,
I meant to say:
RxSwift => .share(replay: X, scope: .whileConnected) should behave as:
shareReplay https://github.com/ReactiveX/rxjs/blob/7873f8a6d84fcc66c48045208914719ad50e26b2/compat/operators/shareReplay.ts.share(), .publish().refCount()RxSwift => .share(replay: X, scope: .forever) should behave as Rx.NET sharing operators.
I'm not sure is it clear from the above statement, but .forever is basically a relic because of Rx.NET and the behavior doesn't make much sense to me.
You should use .whileConnected in most cases and you should experience RxJS and RxJava compatible behavior.
I haven't seen any tests for RxJava that describe the behavior that you are expecting with sharing operators and completion.
The reported issue says
.share(replay: 1, scope: .forever) does not always replay a stale value
and you are saying the problem is that this RxSwift code doesn't behave as expected:
let xs: Observable<Double> = Observable.create { observer in
print("Performing work ...")
observer.onNext(Date().timeIntervalSince1970)
observer.onCompleted()
return Disposables.create()
}
.share(replay: 1, scope: .forever)
xs.subscribe(onNext: { print("first: next \($0)") }).dispose()
xs.subscribe(onNext: { print("second: next \($0)") })}
What is the equivalent of this code for some other implementation that demonstrates the behavior that you want?
I see, this RxJava implementation does what I would expect and want:
val o1 = Observable.create<Int> {
it.onNext(Random().nextInt(100))
}.replay(1).autoConnect()
o1.subscribe({ println(it) }).dispose()
o1.subscribe({ println(it) })
@TomasLinhart as far as I can remember autoConnect() doesn't ever dispose the connection, so I don't see how can we compare them reasonably :/ because the main issue is how does replay part behave after the disposal, and I think that dispose/unsubscribe never gets called on replay(1) in your RxJava example.
Not sure what more to do with this.
Most helpful comment
.share(replay: 1, scope: .forever)is unique to RxSwift so there is no such thing in RxJS or RxJava.If you use the same underlying implementation in RxJS (using multicast) it behaves same as in RxSwift: https://jsfiddle.net/g5pj065m/3204/
In RxJava there is no multicast, so it is not possible to use the same implementation. But they have different means to achieve same results. There is a
.cache()operator. This is very similar to.share(replay: 1, scope: .forever)but it replays all values.This behaves correctly as expected. It replays correctly after disposing.
It is also possible to a use combination of
.replay(1)and.autoConnect().This also behaves correctly as expected. It replays correctly after disposing.
There is an extension to RxJava called RxReplayingShare that behaves somehow similar to RxSwift approach, but it fetches value again if the original stream completed, so it is somehow more consistent than RxSwift approach.
I understand, it might be not necessarily a bug but nevertheless it is confusing and not very intuitive and as you can see it behaves more intuitively in RxJava,