Rxjava: Unexpected difference between flatmap and concatmap when the stream has only one element

Created on 15 Jul 2016  路  13Comments  路  Source: ReactiveX/RxJava

Hello,

I am encountering an unexpected behavior in a piece of code I wrote. I've tried to narrow down a reproduction case in this gist.

To sum up :

  • It's on RxJava 1.1.7
  • I have 2 Observables built in such that I am sure their elements match
  • These observables come from a flatmap on another observable that emits only one element
  • I zip the 2 observables and see that in the output some elements do not match
  • important : I use observeOn/subscribeOn because in real life, my process performs io.

What I observe :

  • if I replace flatmap by concatmap, no more problem. The way I understand both operations, they should not be different if the mapped stream emits only one element. Do I miss sth ?
  • If I remove the observeOn/subscribeOn, no more problem, so I suspect a concurrency issue
  • The way it looks is that flatmap shuffles data, but I did a simpler test (use method initTestFlowWithOrderCheck instead of initTestFlowWithZip ) that seems to discard this hypothesis
  • In the end, it could be a concurrency issue in zip but I've read the operator extensively and could not see any problem with it.

Here I am with my investigation, I may have missed something obvious to someone else so I'm looking forward to have your feedback on that.

Thx

Question

Most helpful comment

See #4209.

All 13 comments

By definition, flatMap doesn't guarantee any ordering of its events - not between different sources but sometimes not even within the same source (solo or otherwise).

If you drive it in an async flow, signals may skip ahead yielding the inconsistency. Therefore, If you need strict ordering, use concat(Map) or concat(Map)Eager.

but sometimes not even within the same source

are you talking about onNext from one source being emitted out of order with itself? something like this:

--A-B--->
--C----->

flatMap

--C-B-A->

Yes.

That is surprising to me. @benjchristensen I thought that ordering of onNext from one Observable was to be maintained. I don't see in the general rx contract anywhere but I know we've verbally talked about it over the years.

I'm still having a hard time understanding what happens under the hood. @akarnokd I'm not saying you're wrong, I just want to understand.

First of all, how can this be that a source can be shuffled ? Map is probably not responsible for that, so it must come from merge. As I understand MergeOperator, there are 2 ways to emit a scalar that came from a source : the fast path, from tryEmit, or the emit loop whose role is to unstack failed fast path attempts. Because each source/inner subscriber has its own queue and because of exclusion ensured by the emitting flag, I don't see how it would be possible inside the operator. I would rather see that coming from the fact that the inner subscriber requests batches of scalars that, in an asynchronous flow, may return in a shuffled order. But the shuffling would not be induced by merge operator hence not by flatmap. What am I missing here ?

Assuming now that flatmap may shuffle even the events emitted by one of the observables produced in map, I should be able to reproduce the problem without a zip (using initTestFlowWithOrderCheck as stated in my 3rd observation mark). How can zip play a role in it given your explanation ?

I think there is a lack in the javadoc. The user of the API should be warned about the fact that Merge and operators composed from it (flatmap for instance) may shuffle the flow. I may propose an update of the doc if the behavior is confirmed.

Finally I agree with @abersnaze about the fact that such a behavior is surprising and unexpected.

It's a property of the queue bypass interacting with the backpressure logic.

This happens when the coordinator runs out of requests. When the source emits, it goes to the queue. Then a request comes in (Thread A), increments the counter but at the same time, the source emits again (Thread B). Now the bypass sees a non-zero requested, emits the later immediately. Then either it or the requestor thread enters the drain loop and emits the first value from the queue.

Ok, I get it. Thanks a lot for the explanation.

Wouldn't it be interesting to try to drain the queue of the source at the begining of the scalar emission ? Of course this would be a slower path, but still a faster one than the whole emitLoop.

--A-B--->
--C----->

flatMap

--C-B-A->

That breaks the Rx contract. Ordering of a single stream should always be maintained. Interleaving via merge etc is between streams, not elements of the same stream.

@benjchristensen then the operator delay(Func1<T, Observable<U>>) would illegal in case the item delays change the order. This is more like a matter of expectation regarding flatMap.

The fix is trivial but adds a likely 10% overhead to the fast-path emissions.

That operator is allowing a developer to consciously reorder events, so that's not breaking the contract, as it isn't the Observable itself changing order, but user code choosing to do so.

Similar to how filter chooses to drop events. But filter can't reorder them for example.

I agree completely that the reordering within an Observable behavior is not at all what I expect intuitively.

If we do want to add the ability to arbitrarily change events, then we should definitely make it explicit rather than the default behavior.

See #4209.

Closing via #4209. Fix released in 1.1.8.

Was this page helpful?
0 / 5 - 0 ratings