The following code:
val disposable = Single.just(0)
.map {
doSleep(1000)
it + 1
}
.subscribeOn(Schedulers.io())
.subscribe( { it ->
System.out.println("${System.currentTimeMillis()} onSuccess: $it")
})
System.out.println("${System.currentTimeMillis()} dispose()")
disposable.dispose()
System.out.println("${System.currentTimeMillis()} disposeDone")
doSleep(Integer.MAX_VALUE)
Results in the following output:
1534868048066 dispose()
1534868048066 disposeDone
1534868049638 onSuccess: 1
This feels counter-intuitive to me. I was under the assumption that once dispose() is called, onNext/onSuccess would be guaranteed to not be called.
The same example with Observable instead of Single works well. Looking at the source:
This seems to be the main difference with ObservableJust: SingleJust emits the item unconditionally, even if the observer is disposed already.
Is this the intended behaviour of SingleJust or is it a bug ?
Apparently it's a duplicate of https://github.com/ReactiveX/RxJava/issues/5697. I'm keeping this issue open the time to convince myself.
EDIT: with retrospect, I'm not sure at all it was related
How is doSleep implemented? If it ignores the InterruptedException then no wonder the success value is delivered. Single operators generally don't eagerly check for isDisposed when they relay onSuccess. If you need that eagerness, apply onTerminateDetach.
Indeed, I did write doSleep specifically to ignore the InterruptedException:
fun doSleep(ms: Int) {
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < ms) {
try {
Thread.sleep(System.currentTimeMillis() - start )
} catch (e: Exception) {
}
}
}
My concern is more about making sure I don't do anything illegal in onSuccess. Coming from Android, it's a common pattern to call Disposable.dispose() from Activity.onDestroy() to make sure that no callback is modifying the activity state too late.
I always assumed this was safe. I'm trying to evaluate the implications of onSuccess still being called after dispose()
Would it make sense to add a comment to the dispose javadoc ? Right now the documentation is pretty minimalist:
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
I'm thinking about adding something along the lines of describing what a "resource" is and making it explicit that some values can still be delivered for a short period of time after dispose() is called.Or is that already somewhere else ?
The keywords are "best effort", see #1590 for example. Disposable is a general mechanism for both stopping flows, cancelling tasks and freeing resources.
Ok I guess... I'm wondering how come this is not a bigger thing. Almost every Android dev I know relies on dispose() actually cancelling onSuccess()/onNext() immediatly. Looks like the "best effort" is good enough to make it work well in most cases...
Most Android dev uses observeOn(AndroidSchedulers.mainThread()) which prevents the delivery in such situations anyway.
That's the thing, I'm able to reproduce this in an Android context using observeOn(AndroidSchedulers.mainThread()):
val disposable = Single.just(0).map {
System.out.println("RXTEST ${System.currentTimeMillis()} sleep")
doSleep(2000)
System.out.println("RXTEST ${System.currentTimeMillis()} it=$it")
it + 1
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe( { it ->
System.out.println("RXTEST ${System.currentTimeMillis()} onSuccess")
})
System.out.println("RXTEST ${System.currentTimeMillis()} dispose()")
doSleep(500)
disposable.dispose()
System.out.println("RXTEST ${System.currentTimeMillis()} disposeDone")
(this code is called from a click handler so I'm pretty sure it's the main thread)
I will sometimes get "onSuccess" displayed and sometimes not...
With that sleep, it should not happen. Does it happen with observeOn(Schedulers.single()) ?
It does not seem to happen with Schedulers.single(). Tried 30 times and I have never had onSuccess called after dispose. It's hard to be 100% sure though because when using the AndroidSchedulers.mainThread() it works "most" of the time as well. I'll keep trying and post my findings here
I think what happens is that the Android handler callback is posted from a Schedulers.io() context in SingleObserveOn:
public void onSuccess(T value) {
// this code is called outside the main thread because of a previous subscribeOn(Schedulers.io())
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
// At this point, the handler might have scheduled the callback already so disposing it does not abort the callback
DisposableHelper.replace(this, d);
}
DisposableHelper.replace(this, d); will call handler.removeCallbacks() but there's no guarantee that the main thread won't have started running it before it's actually removed.
Is there a way I can enforce that .dispose() will consistently abort the delivery of onSuccess ?
onTerminateDetach.
Thanks, will do. I'm closing this issue.
Most helpful comment
onTerminateDetach.