Rxjava: PublishSubject: OnNext Not delivered or OnNext delivered on wrong Thread

Created on 27 Mar 2016  路  5Comments  路  Source: ReactiveX/RxJava

I was writing some automated test to verify the behaviour of PublishSubject and I noticed strange and unexpected behaviour when the subject is subscribed on a Scheduler. If I do not put a sleep after the subscriber, I will not get the onNext items but the onCompleted is called on the correct Thread. If I put a sleep after subscribing, the subscriber receives the onNext items but on the wrong thread. It receives the onNext items on the thread that called the subject.onNext.

I wrote some unit test to show the behaviour. Both tests will fail with rxjava 1.1.2. Is this a bug or is this expected behavior and where can I find this in the documentation?

import org.junit.Test;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import static org.assertj.core.api.Assertions.assertThat;
import static rx.schedulers.Schedulers.newThread;

public class PublishSubjectTest {

    @Test
    public void subscribeOn_WhenNoSleep_ThenNoOnNextReceived() throws InterruptedException {

        // GIVEN
        PublishSubject<String> subject = PublishSubject.create();
        Thread currentThread = Thread.currentThread();
        TestSubscriber<String> subscriber = new TestSubscriber<>();

        // WHEN
        subject.subscribeOn(newThread()).subscribe(subscriber);
        subject.onNext("one");
        subject.onCompleted();

        // THEN
        subscriber.awaitTerminalEvent();
        assertThat(subscriber.getLastSeenThread()).isNotSameAs(currentThread);
        assertThat(subscriber.getOnNextEvents()).containsOnly("one");

    }

    @Test
    public void subscribeOn_WhenSleep_ThenOnNextReceivedButOnWrongThread() throws InterruptedException {

        // GIVEN
        PublishSubject<String> subject = PublishSubject.create();
        Thread currentThread = Thread.currentThread();
        TestSubscriber<String> subscriber = new TestSubscriber<>();

        // WHEN
        subject.subscribeOn(newThread()).subscribe(subscriber);
        Thread.sleep(2000);
        subject.onNext("one");
        subject.onCompleted();

        // THEN
        subscriber.awaitTerminalEvent();
        assertThat(subscriber.getOnNextEvents()).containsOnly("one");
        assertThat(subscriber.getLastSeenThread()).isNotSameAs(currentThread);

    }

}
Question

Most helpful comment

The problem is that you use subscribeOn instead of observeOn to move the delivery of events to another thread. When you subscribeOn, that delays the real subscription to PublishSubject by some amount, giving enough time to onNext to be observed by no one. You see only onCompleted because PublishSubject replays the terminal event to late Subscribers.

All 5 comments

The problem is that you use subscribeOn instead of observeOn to move the delivery of events to another thread. When you subscribeOn, that delays the real subscription to PublishSubject by some amount, giving enough time to onNext to be observed by no one. You see only onCompleted because PublishSubject replays the terminal event to late Subscribers.

Thx David,

Indeed, observeOn seems to solve the behaviour I was seeing. With subsribeOn and waiting long enough for the subscription to succeed, can you also explain why the terminal event is delivered on the main thread and not on the scheduler thread?

You are racing with the subscription and by chance, it is possible the subscription happens in time for onComplete to pass through directly to the Subscriber.

Hi

I am not sure about the racing condition explanation, because I can reproduce it with a BehaviorSubject too (with onNext() called after subscription and timer before it)

@Test
  public void testSubjectThreadFollowSubscribeOn() {
    TestObserver<String> tester = new TestObserver<>();
    BehaviorSubject<Integer> source = BehaviorSubject.create();
    String currentThread = Thread.currentThread().getName();

    source
        .subscribeOn(Schedulers.io())
        .take(1)
        .map(o -> Thread.currentThread().getName())
        .subscribe(tester);
    source.onNext(1);

    tester.awaitCount(1);
    tester.assertValue(threadAfterSubscribeOn -> !threadAfterSubscribeOn.equals(currentThread));
}

You still have a race where onNext could find an already subscribed observer and signal the first item on the test thread.

Was this page helpful?
0 / 5 - 0 ratings