Rxjava: execution stop and occur MissingBackpressureException.

Created on 14 Jan 2015  路  10Comments  路  Source: ReactiveX/RxJava

stop executing and occur exception in 500~10000 events.

rxjava-1.0.4
rxandroid-0.24.0 using.

Code :

    Observable.interval(1000, TimeUnit.SECONDS)
        .map(count -> {
            return Context.getTime();
        })
        .retry()
        .subscribe(time -> {
            System.out.println(time);
            timeTextView.setText(time);
        });

Error Message :

...
2015-01-14 13:30:01
2015-01-14 13:30:02
2015-01-14 13:30:03
2015-01-14 13:30:04
2015-01-14 13:30:05
rx.exceptions.MissingBackpressureException
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
        at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
        at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:231)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
        at rx.Scheduler$Worker$1.call(Scheduler.java:120)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
        at java.lang.Thread.run(Thread.java:841)

Most helpful comment

There must be something in your code you are not showing that makes this issue. This works for me:

public class IntervalRetry {
    public static void main(String[] args) throws Exception {
        Observable.interval(1, TimeUnit.MICROSECONDS)
        .map(count -> {
            return System.currentTimeMillis();
        })
        .onBackpressureDrop()
        .retry()
        .observeOn(Schedulers.computation())
        .subscribe(time -> {
            System.out.println(time);
        }, Throwable::printStackTrace, System.out::println);
        Thread.sleep(10000);
    }
}

All 10 comments

Looks your codes do not confirm the error message. Where is observeOn and subscribeOn? And Observable.interval(1000, TimeUnit.SECONDS) should be Observable.interval(1, TimeUnit.SECONDS), right?

If you see MissingBackpressureException, it's usually because the Subscriber can not catch up the speed of the source.

@zsxwing

Oh! Sorry.
"Observable.interval(1, TimeUnit.SECONDS)" is right!!

hm...
How can I solve this problem?? (use timer???)

Interval doesn't (and can't) support backpressure so you need to use onBackpressureDrop or similar operators to handle the overflow of values.

@akarnokd

i use onBackpressureDrop , but same problem

Observable.interval(1000, TimeUnit.SECONDS)
    .map(count -> {
        return Context.getTime();
    })
    .retry()
    .onBackpressureDrop()
    .subscribe(time -> {
        System.out.println(time);
        timeTextView.setText(time);
    });

There must be something in your code you are not showing that makes this issue. This works for me:

public class IntervalRetry {
    public static void main(String[] args) throws Exception {
        Observable.interval(1, TimeUnit.MICROSECONDS)
        .map(count -> {
            return System.currentTimeMillis();
        })
        .onBackpressureDrop()
        .retry()
        .observeOn(Schedulers.computation())
        .subscribe(time -> {
            System.out.println(time);
        }, Throwable::printStackTrace, System.out::println);
        Thread.sleep(10000);
    }
}

@akarnokd
Oh! Thank you.

You are welcome.

@akarnokd what the purpose of retry() in your code?

I don't know, it was part of the OP's question.

@akarnokd gotcha, thanks for quick answer. I thought it's necessary to use it with onBackpressureDrop().

Was this page helpful?
0 / 5 - 0 ratings

Related issues

yubaokang picture yubaokang  路  3Comments

dsvoronin picture dsvoronin  路  4Comments

aballano picture aballano  路  3Comments

wangjingling picture wangjingling  路  3Comments

paulblessing picture paulblessing  路  3Comments