I migrate from 1.x to 2.x, replace Subscription to Disposable, and I'd like to cancel the subscription before a new subscription starts. But I got RxCachedThreadScheduler-n when calling Disposable.dispose(). I've check #4807 and found that it may timeout problem, but I'm sure that my api is pretty fast and won't be timeout at all. How can I resolve this problem??
E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.machipopo.swag, PID: 30241
java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$2.read(Okio.java:136)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okio.RealBufferedSource.require(RealBufferedSource.java:64)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
at okhttp3.internal.http.Http1xStream$ChunkedSource.readChunkSize(Http1xStream.java:441)
at okhttp3.internal.http.Http1xStream$ChunkedSource.read(Http1xStream.java:422)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
at okio.InflaterSource.refill(InflaterSource.java:101)
at okio.InflaterSource.read(InflaterSource.java:62)
at okio.GzipSource.read(GzipSource.java:80)
at okio.RealBufferedSource.request(RealBufferedSource.java:71)
at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:225)
at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:187)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
at okhttp3.RealCall.execute(RealCall.java:57)
at com.test.api.ApiService.get(ApiService.java:145)
at com.test.UserApi$1.subscribe(UserApi.java:63)
if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isUnsubscribed())
mSubscriptionLoadMe.unsubscribe();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
onNext();
onCompleted();
onError();
});
if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
onNext();
onCompleted();
onError();
});
I've add timeout setting to OkHttpClient, but no help. (HTTP_TIMEOUT = 180 secs)
new OkHttpClient.Builder()
.addInterceptor(new HttpLoggingInterceptor().setLevel(debugLevel))
.addNetworkInterceptor(new StethoInterceptor())
.connectTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
.readTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
.build();
Maybe your thread is in an interrupted state when you subscribe.
How do I fix that? Or how can I cancel a api call subscription in 2.x?
Thread.currentThread().interrupted() will clear the interrupt state before you subscribe. Otherwise, this seems to be an issue with OkHttp and you have a better chance asking them about the situation.
OK, one more question here, is my migration code from 1.x to 2.x correct? I'd like to make sure that there is only one api call at a time.
if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
onNext();
onCompleted();
onError();
});
Yes, the patterns remained largely the same.
Got it, thanks for your help.
override fun getCities(): Observable<List<City>> {
return syncCities().publish { Observable.merge(it, databaseManager.getCities().takeUntil(it)) }
.filter { it.isNotEmpty() }
.distinct()
.onErrorResumeNext(databaseManager.getCities())
}
private fun syncCities(): Observable<List<City>> {
return Observable.error { Throwable("Error") }
}
I'm getting
_FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: .debug, PID: 27335
java.lang.Throwable: Error*
when running above code. Anyone could point me to what is wrong there?
EDIT getCities() is called with subscribeOn(io()) and observeOn(mainThread()) of course
@michaldrabik What's your full exception stack trace?
@enginebai Pasted here: http://pastebin.com/GHjJWtiS
When I remove takeUntil() everything works fine. Might be me missing something...
This is not work for me too, I wrote a simple test app for RxJava2 and next code causes application to crash:
private Disposable subscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
subscription = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(5000);
if (!e.isDisposed()) {
e.onNext(25);
e.onComplete();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity","Value: "+integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("MainActivity","Error: "+throwable.toString());
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscription.dispose();
}
I start simple async task in onCreate() method and I want to unsubscribe (dispose) in activity onDestroy call (Android), but subscription.dispose() causes the next crash (it not happens if Thread.sleep() finished):
12-19 15:11:56.797 5041-5058/com.example.don.rxjava2test E/AndroidRuntime: FATAL EXCEPTION:
RxCachedThreadScheduler-1
Process: com.example.don.rxjava2test, PID: 5041
java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:371)
at java.lang.Thread.sleep(Thread.java:313)
at com.example.don.rxjava2test.MainActivity$3.subscribe(MainActivity.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10514)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
I expect to see my error log, but application simply crashes.
@don11995 If you don't care about such exceptions then you can suppress them via:
RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());
@akarnokd , thank You, Your solution works, but what about if I want to handle this error in subscriber?
Also this is not called if InterruptedException happens:
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("MainActivity","Uncaught: "+throwable.toString());
}
});
Such errors happen after the lifecycle of Subscribers and you can't handle them there. Cancelling a Subscriber is an indication that you no longer want to receive any events.
That signal should be used to suppress the errors instead of conflating them with fundamentally underliverable ones (exception during onError, after onComplete, etc.). Sending the two different types of exceptions to a single callback means that you can neither crash the app to indicate a programming problem because it might just be someone that unsubscribed but you also cannot simple ignore all errors to the plugin because it might be a programming problem.
Closing via #5075 for now. Let us know if 2.0.6 fixes this for you (tomorrow).
@akarnokd 2.0.6 still doesn't resolve this issue for me. I just disposed an okhttp call but instead of unsubscribing normally, it just crashed.
Here is the crash log.
FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.crazyhitty.chdev.ks.rssmanagerlib.demo, PID: 27708
io.reactivex.exceptions.UndeliverableException: java.io.InterruptedIOException: thread interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)
at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler$1.run(Scheduler.java:138)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
Caused by: java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:146)
at okio.Okio$2.read(Okio.java:135)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
at okio.RealBufferedSource.request(RealBufferedSource.java:66)
at okio.RealBufferedSource.require(RealBufferedSource.java:59)
at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:284)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
at okio.RealBufferedSource.read(RealBufferedSource.java:45)
at okio.RealBufferedSource.request(RealBufferedSource.java:66)
at okio.RealBufferedSource.require(RealBufferedSource.java:59)
at okio.GzipSource.consumeHeader(GzipSource.java:114)
at okio.GzipSource.read(GzipSource.java:73)
at okio.RealBufferedSource.request(RealBufferedSource.java:66)
at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:387)
at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:371)
at okhttp3.internal.Util.bomAwareCharset(Util.java:412)
at okhttp3.ResponseBody.string(ResponseBody.java:173)
at com.crazyhitty.chdev.ks.rssmanager.RssReader$1.subscribe(RssReader.java:48)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10700)聽
at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)聽
at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)聽
at io.reactivex.Observable.subscribe(Observable.java:10700)聽
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)聽
at io.reactivex.Scheduler$1.run(Scheduler.java:138)聽
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)聽
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)聽
at java.util.concurrent.FutureTask.run(FutureTask.java:237)聽
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)聽
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)聽
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)聽
at java.lang.Thread.run(Thread.java:761)聽
@crazyhitty looks like you have an Observable.create() around the okhttp blocking call. You have to check for cancellation before emitting any error if you don't want to receive such errors:
Observable.create(emitter -> {
// ...
try {
responsebody.string();
} catch (InterruptedException ex) {
if (!emitter.isDisposed()) {
emitter.onError(ex);
return;
}
}
})
@akarnokd You are a genius mate, thanks for the such an easy solution 馃憤
@crazyhitty since version 2.1.1 tryOnError is available:
The emitter API (such as FlowableEmitter, SingleEmitter, etc.) now features a new method, tryOnError that tries to emit the Throwable if the sequence is not cancelled/disposed. Unlike the regular onError, if the downstream is no longer willing to accept events, the method returns false and doesn't signal an UndeliverableException.
Most helpful comment
@crazyhitty looks like you have an
Observable.create()around the okhttp blocking call. You have to check for cancellation before emitting any error if you don't want to receive such errors: