Rxjava: Threading in a SerializedSubject

Created on 14 Mar 2016  路  12Comments  路  Source: ReactiveX/RxJava

With usual Observable, the Observer is executed on the same thread as the one that the Observable is created. With SerializedSubject, there's some magic that is unclear. Below is the code snippet:

object Launcher extends App {
  val pubSub = PublishSubject.create[Int]
  val serSub = new SerializedSubject(pubSub)
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  serSub.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(serSub)
  Observable.just(6).subscribe(serSub)

  while (true) {}
}

One would expect that the numbers 1 and 2 are printed inside some computation thread, and the 6 is printed inside main thread. Now, if I remove the Observable.just(6) line, everything is printed inside the computation thread. Is that the expected behavior? If so, how does the SerializedSubject decide which thread to run?

Now, I can modify a code a bit (publish to pubSub and subscribe to serSub)

object Launcher extends App {
  val pubSub = PublishSubject.create[Int]
  val serSub = new SerializedSubject(pubSub)
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  serSub.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(pubSub)
  Observable.just(6, 7, 8, 9, 10).subscribe(pubSub)

  while (true) {}
}

then things get printed in the expected thread. Is this the legit way to do it if I want this behavior?

Question

All 12 comments

I'm not looking at the source code at the moment but I suspect my description won't be far off the mark. When a thread A sends an item to a PublishSubject that item is added to a queue and if no drain loop is currently running on that queue (to emit the elements in the queue to all the subscribers of the PublishSubject) then the thread A runs the drain on the queue and will emit items on thread A. If thread B is currently running the drain loop then it will iterate through the queue items emitting them and eventually pop the element off the queue that thread A put there and emit it (on thread B).

Thus what you are seeing is a race to get the drain loop by the main thread and the computation thread.

Does that help?

@davidmoten Thanks a lot! But I'm still a bit confused. So essentially, I have

val pubSub = PublishSubject.create[Int]
val serSub = new SerializedSubject(pubSub)

and all my actions are subscribed to serSub. Now, if I do onNext on serSub from two threads, it's always the case that everything gets invoked on just one of the thread, which is the behavior that you described. But if I do onNext on pubSub from two different threads, then the actions are executed in the correct thread.

It would be great to know if these are expected behaviors, and moreover, if the second way (onNext on pubSub but subscribe to serSub) is a legal way to use a SerializedSubject.

Thanks!!!

Calling onNext from multiple threads on a PublishSubject violates its contract. It's just coincidental that such call doesn't cause problems with the Subscriber downstream. If you want to stay on the "correct" thread, don't use a Subject but set up a sequence for each thread via subscribeOn.

@akarnokd Thanks for the reply. Just to confirm, calling onNext from multiple threads on a PublishSubject still violate the contract, even if I only subscribes to the SerializedSubject?

Could you please explain more about what you mean by setting up a sequence for each thread?

My main goal was to create an EventBus using RxJava. Instead of using Subject, what should I use instead?

still violate the contract

Yes. SerializedSubject wraps the PublishSubject and makes sure concurrent calls to SerializedSubject.onNext are serialized into a single, likely hopping, thread thus PublishSubject will be driven by non-concurrent onNext calls.

If you can distingush between events, then you can have different receivers:

Subject<Integer, Integer> subject = PublishSubject.<Integer>create().toSerialized();

Observable<Integer> odd = subject.filter(v -> v % 2 != 0).observeOn(Schedulers.io());
Observable<Integer> even = subject.filter(v -> v % 2 == 0).observeOn(Schedulers.newThread());

odd.subscribe(...);
even.subscribe(...);

@akarnokd That's a very helpful answer! I noticed that you are using observeOn in your example instead of subscribeOn (as you mentioned in the previous comment). Since subscribeOn usually changes the thread up in the stream, I would expect that it changes the thread where the subject's onNext is called and hence, where the subscribed action is executed. However, when I play with it, subscribeOn doesn't actually affect anything. Is it because SerializedSubject doing some _serializing_ which makes subscribeOn useless?

In general, due to what @davidmoten said, the actual thread where things get executed (without using observeOn to modify it) is essentially only determined at run time. Is my understanding correct?

Thanks again!

There is a significant difference between subscribeOn and observeOn. Simply put, subscribeOn moves the subscription to another thread:

subject.subscribe(...); // current thread

executor.submit(() -> subject.subscribe(...)); // another thread

and observeOn moves the observation of onXXX calls to another thread:

subscriber.onNext(...); // current thread

executor.submit(() -> subscriber.onNext(...)); // other thread

I like describing them as if they were for loops. subscribeOn as scheduling outside the for loop:
executor.submit( () -> for(item : items) { onNext(item); } )
and observeOn as scheduling inside the for loop:
for(item : items) { executor.submit( () -> onNext(item); } )

@akarnokd and @abersnaze So usually, people say that subscribeOn changes the thread _upstream_, is it because things are evaluated lazily, when the for loop is inside the new thread, the items are evaluated on the new thread as well?

So now, back to my question about SerializedSubject. Due to some magic that SerializedSubject does, the actual thread where an Action (in subject.subscribe(...)) is executed is undetermined until real time right?

If there is an emission race from multiple sources to a SerializedSubject, any of the participating thread may end up emitting to the subscriber's onNext method and keep emitting the other's values as well.

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

what

Was this page helpful?
0 / 5 - 0 ratings