Rxjava: Caused by: rx.exceptions.MissingBackpressureException

Created on 7 Mar 2016  路  6Comments  路  Source: ReactiveX/RxJava

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add onError handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60)
at android.os.Handler.handleCallback(Handler.java:733)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:136)
at android.app.ActivityThread.main(ActivityThread.java:5315)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:515)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:864)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:680)
at dalvik.system.NativeStart.main(Native Method)
Caused by: rx.exceptions.OnErrorNotImplementedException
at rx.Observable$27.onError(Observable.java:7535)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.onError(OperatorSubscribeOn.java:71)
at rx.internal.operators.OperatorOnBackpressureDrop$2.onError(OperatorOnBackpressureDrop.java:76)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
... 9 more
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:138)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
at rx.Scheduler$Worker$1.call(Scheduler.java:120)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)
rx.exceptions.OnErrorNotImplementedException
at rx.Observable$27.onError(Observable.java:7535)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.onError(OperatorSubscribeOn.java:71)
at rx.internal.operators.OperatorOnBackpressureDrop$2.onError(OperatorOnBackpressureDrop.java:76)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at android.os.Handler.handleCallback(Handler.java:733)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:136)
at android.app.ActivityThread.main(ActivityThread.java:5315)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:515)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:864)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:680)
at dalvik.system.NativeStart.main(Native Method)
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:138)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
at rx.Scheduler$Worker$1.call(Scheduler.java:120)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)
rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:138)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
at rx.Scheduler$Worker$1.call(Scheduler.java:120)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)

Question

All 6 comments

You applied the onBackpressureDrop at the wrong location. You should apply it just after interval.

@akarnokd Thanks for your answer. But i don鈥榯 known what's wrong in my code.
Observable.timer(1, 1, TimeUnit.SECONDS) .map(aLong -> calcLeftTime()) .observeOn(AndroidSchedulers.mainThread()) .onBackpressureDrop() .subscribeOn(Schedulers.io()) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { if (aLong > 0) { countTime.updateShow(aLong); } else { countTime.setVisibility(View.INVISIBLE); } } }, Throwable::printStackTrace);

@itxuye

  1. Apply onBackpressureDrop() after timer()
  2. You don't need subscribeOn() since Observable.timer() will run on Schedulers.computation() by default and you're not overriding it via subscribeOn().

@artem-zinnatullin Thanks.

The reason is the observeOn is a producer/consumer queue between the timer and the work being done in the main thread. That queue is filling up because the producer is faster.

@artem-zinnatullin Thanks.

Was this page helpful?
0 / 5 - 0 ratings