Rxjava: Get RxCachedThreadScheduler-n when calling Disposable.dispose()

Created on 18 Nov 2016  路  20Comments  路  Source: ReactiveX/RxJava

Requirement

I migrate from 1.x to 2.x, replace Subscription to Disposable, and I'd like to cancel the subscription before a new subscription starts. But I got RxCachedThreadScheduler-n when calling Disposable.dispose(). I've check #4807 and found that it may timeout problem, but I'm sure that my api is pretty fast and won't be timeout at all. How can I resolve this problem??

Exception

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
                  Process: com.machipopo.swag, PID: 30241
                  java.io.InterruptedIOException: thread interrupted
                      at okio.Timeout.throwIfReached(Timeout.java:145)
                      at okio.Okio$2.read(Okio.java:136)
                      at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
                      at okio.RealBufferedSource.request(RealBufferedSource.java:71)
                      at okio.RealBufferedSource.require(RealBufferedSource.java:64)
                      at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:270)
                      at okhttp3.internal.http.Http1xStream$ChunkedSource.readChunkSize(Http1xStream.java:441)
                      at okhttp3.internal.http.Http1xStream$ChunkedSource.read(Http1xStream.java:422)
                      at okio.RealBufferedSource.read(RealBufferedSource.java:50)
                      at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
                      at okio.InflaterSource.refill(InflaterSource.java:101)
                      at okio.InflaterSource.read(InflaterSource.java:62)
                      at okio.GzipSource.read(GzipSource.java:80)
                      at okio.RealBufferedSource.request(RealBufferedSource.java:71)
                      at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:225)
                      at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:187)
                      at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160)
                      at okhttp3.RealCall.execute(RealCall.java:57)
                      at com.test.api.ApiService.get(ApiService.java:145)
                      at com.test.UserApi$1.subscribe(UserApi.java:63)

Old 1.x Code

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isUnsubscribed()) 
    mSubscriptionLoadMe.unsubscribe();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            onNext();
            onCompleted();
            onError();
        });

New 2.x Code

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
    mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<String>() {
            onNext();
            onCompleted();
            onError();
        });
Question

Most helpful comment

@crazyhitty looks like you have an Observable.create() around the okhttp blocking call. You have to check for cancellation before emitting any error if you don't want to receive such errors:

Observable.create(emitter -> {
    // ...
    try {
         responsebody.string();
    } catch (InterruptedException ex) {
        if (!emitter.isDisposed()) {
            emitter.onError(ex);
            return;
        }
    }
})

All 20 comments

I've add timeout setting to OkHttpClient, but no help. (HTTP_TIMEOUT = 180 secs)

new OkHttpClient.Builder()
        .addInterceptor(new HttpLoggingInterceptor().setLevel(debugLevel))
        .addNetworkInterceptor(new StethoInterceptor())
        .connectTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .writeTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .readTimeout(ApiService.HTTP_TIMEOUT, TimeUnit.SECONDS)
        .build();

Maybe your thread is in an interrupted state when you subscribe.

How do I fix that? Or how can I cancel a api call subscription in 2.x?

Thread.currentThread().interrupted() will clear the interrupt state before you subscribe. Otherwise, this seems to be an issue with OkHttp and you have a better chance asking them about the situation.

OK, one more question here, is my migration code from 1.x to 2.x correct? I'd like to make sure that there is only one api call at a time.

if (mSubscriptionLoadMe != null && !mSubscriptionLoadMe.isDisposed())
    mSubscriptionLoadMe.dispose();
mSubscriptionLoadMe = UserApi.getUserInfo(this, userId)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<String>() {
            onNext();
            onCompleted();
            onError();
        });

Yes, the patterns remained largely the same.

Got it, thanks for your help.

  override fun getCities(): Observable<List<City>> {
    return syncCities().publish { Observable.merge(it, databaseManager.getCities().takeUntil(it)) }
        .filter { it.isNotEmpty() }
        .distinct()
        .onErrorResumeNext(databaseManager.getCities())
  }

  private fun syncCities(): Observable<List<City>> {
    return Observable.error { Throwable("Error") }
}

I'm getting

_FATAL EXCEPTION: RxCachedThreadScheduler-1
Process:
.debug, PID: 27335
java.lang.Throwable: Error
*

when running above code. Anyone could point me to what is wrong there?

EDIT getCities() is called with subscribeOn(io()) and observeOn(mainThread()) of course

@michaldrabik What's your full exception stack trace?

@enginebai Pasted here: http://pastebin.com/GHjJWtiS

When I remove takeUntil() everything works fine. Might be me missing something...

This is not work for me too, I wrote a simple test app for RxJava2 and next code causes application to crash:

private Disposable subscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        subscription = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Thread.sleep(5000);
                if (!e.isDisposed()) {
                    e.onNext(25);
                    e.onComplete();
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("MainActivity","Value: "+integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e("MainActivity","Error: "+throwable.toString());
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        subscription.dispose();
    }

I start simple async task in onCreate() method and I want to unsubscribe (dispose) in activity onDestroy call (Android), but subscription.dispose() causes the next crash (it not happens if Thread.sleep() finished):
12-19 15:11:56.797 5041-5058/com.example.don.rxjava2test E/AndroidRuntime: FATAL EXCEPTION:

RxCachedThreadScheduler-1
                                                                           Process: com.example.don.rxjava2test, PID: 5041
                                                                           java.lang.InterruptedException
                                                                               at java.lang.Thread.sleep(Native Method)
                                                                               at java.lang.Thread.sleep(Thread.java:371)
                                                                               at java.lang.Thread.sleep(Thread.java:313)
                                                                               at com.example.don.rxjava2test.MainActivity$3.subscribe(MainActivity.java:28)
                                                                               at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10514)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
                                                                               at io.reactivex.Scheduler$1.run(Scheduler.java:134)
                                                                               at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                                                               at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                                                               at java.lang.Thread.run(Thread.java:761)

I expect to see my error log, but application simply crashes.

@don11995 If you don't care about such exceptions then you can suppress them via:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());

@akarnokd , thank You, Your solution works, but what about if I want to handle this error in subscriber?
Also this is not called if InterruptedException happens:

RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e("MainActivity","Uncaught: "+throwable.toString());
            }
        });

Such errors happen after the lifecycle of Subscribers and you can't handle them there. Cancelling a Subscriber is an indication that you no longer want to receive any events.

That signal should be used to suppress the errors instead of conflating them with fundamentally underliverable ones (exception during onError, after onComplete, etc.). Sending the two different types of exceptions to a single callback means that you can neither crash the app to indicate a programming problem because it might just be someone that unsubscribed but you also cannot simple ignore all errors to the plugin because it might be a programming problem.

Closing via #5075 for now. Let us know if 2.0.6 fixes this for you (tomorrow).

@akarnokd 2.0.6 still doesn't resolve this issue for me. I just disposed an okhttp call but instead of unsubscribing normally, it just crashed.

Here is the crash log.

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.crazyhitty.chdev.ks.rssmanagerlib.demo, PID: 27708
io.reactivex.exceptions.UndeliverableException: java.io.InterruptedIOException: thread interrupted
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)
    at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)
    at io.reactivex.Observable.subscribe(Observable.java:10700)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
    at io.reactivex.Scheduler$1.run(Scheduler.java:138)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
 Caused by: java.io.InterruptedIOException: thread interrupted
    at okio.Timeout.throwIfReached(Timeout.java:146)
    at okio.Okio$2.read(Okio.java:135)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.require(RealBufferedSource.java:59)
    at okio.RealBufferedSource.readHexadecimalUnsignedLong(RealBufferedSource.java:284)
    at okhttp3.internal.http1.Http1Codec$ChunkedSource.readChunkSize(Http1Codec.java:444)
    at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:425)
    at okio.RealBufferedSource.read(RealBufferedSource.java:45)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.require(RealBufferedSource.java:59)
    at okio.GzipSource.consumeHeader(GzipSource.java:114)
    at okio.GzipSource.read(GzipSource.java:73)
    at okio.RealBufferedSource.request(RealBufferedSource.java:66)
    at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:387)
    at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:371)
    at okhttp3.internal.Util.bomAwareCharset(Util.java:412)
    at okhttp3.ResponseBody.string(ResponseBody.java:173)
    at com.crazyhitty.chdev.ks.rssmanager.RssReader$1.subscribe(RssReader.java:48)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10700)聽
    at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.subscribe(ObservableZip.java:110)聽
    at io.reactivex.internal.operators.observable.ObservableZip.subscribeActual(ObservableZip.java:72)聽
    at io.reactivex.Observable.subscribe(Observable.java:10700)聽
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)聽
    at io.reactivex.Scheduler$1.run(Scheduler.java:138)聽
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)聽
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)聽
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)聽
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)聽
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)聽
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)聽
    at java.lang.Thread.run(Thread.java:761)聽

@crazyhitty looks like you have an Observable.create() around the okhttp blocking call. You have to check for cancellation before emitting any error if you don't want to receive such errors:

Observable.create(emitter -> {
    // ...
    try {
         responsebody.string();
    } catch (InterruptedException ex) {
        if (!emitter.isDisposed()) {
            emitter.onError(ex);
            return;
        }
    }
})

@akarnokd You are a genius mate, thanks for the such an easy solution 馃憤

@crazyhitty since version 2.1.1 tryOnError is available:

The emitter API (such as FlowableEmitter, SingleEmitter, etc.) now features a new method, tryOnError that tries to emit the Throwable if the sequence is not cancelled/disposed. Unlike the regular onError, if the downstream is no longer willing to accept events, the method returns false and doesn't signal an UndeliverableException.

https://github.com/ReactiveX/RxJava/blob/2.x/CHANGES.md

Was this page helpful?
0 / 5 - 0 ratings