Rxswift: Data race in Merge.swift

Created on 5 Jan 2017  路  9Comments  路  Source: ReactiveX/RxSwift

Short description of the issue:

MergeSinkIter._parent._stopped is being read outside of a lock, causing a ThreadSanitizer (TSAN) crash. It's being read at Merge.swift:262 and being written at Merge.swift:320.

==================
WARNING: ThreadSanitizer: data race (pid=78886)
  Read of size 1 at 0x7d2000049ee0 by thread T2:
    #0 _TFC7RxSwift13MergeSinkIter2onfGOS_5Eventw_1E_T_ Merge.swift:262 (RxSwift+0x00000015ce02)
    #1 _TTWu1_R_7RxSwift25ObservableConvertibleType0_S_12ObserverTypew_1Ezw0_1ErGCS_13MergeSinkIterxq_q0__S1_S_FS1_2onfGOS_5EventwxS3__T_ Merge.swift (RxSwift+0x00000015d810)
    #2 _TFC7RxSwift4Sink9forwardOnfGOS_5Eventwx1E_T_ Sink.swift:29 (RxSwift+0x000000094bd7)
    #3 _TFC7RxSwift23AnonymousObservableSink2onfGOS_5Eventwx1E_T_ AnonymousObservable.swift:31 (RxSwift+0x0000001c3ef4)
    #4 _TTWuRx7RxSwift12ObserverTyperGCS_23AnonymousObservableSinkx_S0_S_FS0_2onfGOS_5Eventwx1E_T_ AnonymousObservable.swift (RxSwift+0x0000001c4610)
    #5 _TPA AnyObserver.swift (RxSwift+0x00000001b502)
    #6 _TFV7RxSwift11AnyObserver2onfGOS_5Eventx_T_ AnyObserver.swift:41 (RxSwift+0x000000019f4c)
    #7 _TFFFe11RxAlamofireRxC9Alamofire11DataRequestrV7RxSwift8Reactive14responseResultuRd__S0_30DataResponseSerializerProtocolrFT5queueGSqCSo13DispatchQueue_18responseSerializerqd___GCS2_10ObservableTCSo15HTTPURLResponsewd__16SerializedObject__U_FGVS2_11AnyObserverTS7_QQd__16SerializedObject__PS2_10Disposable_U_FGVS0_12DataResponseS10__T_ RxAlamofire.swift:732 (RxAlamofire+0x0000000140f6)
    #8 _TPA__TFFFe11RxAlamofireRxC9Alamofire11DataRequestrV7RxSwift8Reactive14responseResultuRd__S0_30DataResponseSerializerProtocolrFT5queueGSqCSo13DispatchQueue_18responseSerializerqd___GCS2_10ObservableTCSo15HTTPURLResponsewd__16SerializedObject__U_FGVS2_11AnyObserverTS7_QQd__16SerializedObject__PS2_10Disposable_U_FGVS0_12DataResponseS10__T_ RxAlamofire.swift (RxAlamofire+0x000000018b53)
    #9 _TFFFC9Alamofire11DataRequest8responseuRxS_30DataResponseSerializerProtocolrFT5queueGSqCSo13DispatchQueue_18responseSerializerx17completionHandlerFGVS_12DataResponsewx16SerializedObject_T__DS0_U_FT_T_U_FT_T_ ResponseSerialization.swift:166 (Alamofire+0x00000006dbaa)
    #10 _TPA__TFFFC9Alamofire11DataRequest8responseuRxS_30DataResponseSerializerProtocolrFT5queueGSqCSo13DispatchQueue_18responseSerializerx17completionHandlerFGVS_12DataResponsewx16SerializedObject_T__DS0_U_FT_T_U_FT_T_ ResponseSerialization.swift (Alamofire+0x00000007b028)
    #11 _TTRgRx9Alamofire30DataResponseSerializerProtocolrXFo___XFdCb___ ResponseSerialization.swift (Alamofire+0x00000006dc55)
    #12 __tsan::invoke_and_release_block(void*) <null>:255 (libclang_rt.tsan_iossim_dynamic.dylib+0x00000005c3fb)
    #13 _dispatch_client_callout <null>:149 (libdispatch.dylib+0x00000002512d)

  Previous write of size 1 at 0x7d2000049ee0 by main thread (mutexes: write M9621):
    #0 _TFC7RxSwift9MergeSink2onfGOS_5Eventx_T_ Merge.swift:320 (RxSwift+0x000000157593)
    #1 _TTWu1_R_7RxSwift25ObservableConvertibleType0_S_12ObserverTypew_1Ezw0_1ErGCS_9MergeSinkxq_q0__S1_S_FS1_2onfGOS_5EventwxS3__T_ Merge.swift (RxSwift+0x00000015dee0)
    #2 _TFC7RxSwift4Sink9forwardOnfGOS_5Eventwx1E_T_ Sink.swift:29 (RxSwift+0x000000094bd7)
    #3 _TFC7RxSwift23AnonymousObservableSink2onfGOS_5Eventwx1E_T_ AnonymousObservable.swift:31 (RxSwift+0x0000001c3ef4)
    #4 _TTWuRx7RxSwift12ObserverTyperGCS_23AnonymousObservableSinkx_S0_S_FS0_2onfGOS_5Eventwx1E_T_ AnonymousObservable.swift (RxSwift+0x0000001c4610)
    #5 _TPA AnyObserver.swift (RxSwift+0x00000001b502)
    #6 _TFV7RxSwift11AnyObserver2onfGOS_5Eventx_T_ AnyObserver.swift:41 (RxSwift+0x000000019f4c)
    #7 _TFFFe11RxAlamofireRxC9Alamofire14SessionManagerrV7RxSwift8Reactive7requestuRd__S_18RxAlamofireRequestrFFzS1_qd__GCS2_10Observableqd___U_FGVS2_11AnyObserverQd___PS2_10Disposable_U_FPS_19RxAlamofireResponse_T_ RxAlamofire.swift:381 (RxAlamofire+0x00000000d1ac)
    #8 _TPA__TFFFe11RxAlamofireRxC9Alamofire14SessionManagerrV7RxSwift8Reactive7requestuRd__S_18RxAlamofireRequestrFFzS1_qd__GCS2_10Observableqd___U_FGVS2_11AnyObserverQd___PS2_10Disposable_U_FPS_19RxAlamofireResponse_T_ RxAlamofire.swift (RxAlamofire+0x000000018e73)
    #9 _TFFE11RxAlamofireC9Alamofire11DataRequest12responseWithFT17completionHandlerFPS_19RxAlamofireResponse_T__T_U_FVS0_19DefaultDataResponseT_ RxAlamofire.swift:346 (RxAlamofire+0x00000000b658)
    #10 _TPA__TFFE11RxAlamofireC9Alamofire11DataRequest12responseWithFT17completionHandlerFPS_19RxAlamofireResponse_T__T_U_FVS0_19DefaultDataResponseT_ RxAlamofire.swift (RxAlamofire+0x000000012619)
    #11 _TFFFC9Alamofire11DataRequest8responseFT5queueGSqCSo13DispatchQueue_17completionHandlerFVS_19DefaultDataResponseT__DS0_U_FT_T_U_FT_T_ ResponseSerialization.swift:126 (Alamofire+0x00000006be7f)
    #12 _TPA__TFFFC9Alamofire11DataRequest8responseFT5queueGSqCSo13DispatchQueue_17completionHandlerFVS_19DefaultDataResponseT__DS0_U_FT_T_U_FT_T_ ResponseSerialization.swift (Alamofire+0x00000007b263)
    #13 _TTRXFo___XFdCb___ Request.swift (Alamofire+0x000000040d35)
    #14 __tsan::invoke_and_release_block(void*) <null>:255 (libclang_rt.tsan_iossim_dynamic.dylib+0x00000005c3fb)
    #15 _dispatch_client_callout <null>:149 (libdispatch.dylib+0x00000002512d)
    #16 <redacted>
    #17 __invoking___ <null>:182 (CoreFoundation+0x00000007d44b)
    #18 start <null>:145 (libdyld.dylib+0x00000000468c)

  Location is heap block of size 120 at 0x7d2000049e80 allocated by main thread:
    #0 malloc <null>:255 (libclang_rt.tsan_iossim_dynamic.dylib+0x0000000404ba)
    #1 swift_slowAlloc <null>:206 (libswiftCore.dylib+0x000000221b28)
    #2 _TFC7RxSwift11FlatMapSinkCfT8selectorFzxq_8observerq0_6cancelPS_10Cancelable__GS0_xq_q0__ Merge.swift (RxSwift+0x0000001596dd)
    #3 _TFC7RxSwift7FlatMap3runuRd__S_12ObserverTypew_1Ezwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS5___ Merge.swift:366 (RxSwift+0x00000015e3f6)
    #4 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #5 _TFC7RxSwift3Map3runuRd__S_12ObserverType_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ Map.swift:136 (RxSwift+0x0000001e4a0c)
    #6 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #7 _TFC7RxSwift19CombineLatestSink2_3runfT_PS_10Disposable_ CombineLatest+arity.swift:57 (RxSwift+0x000000168a44)
    #8 _TFC7RxSwift14CombineLatest23runuRd__S_12ObserverType0_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ CombineLatest+arity.swift:87 (RxSwift+0x00000016b552)
    #9 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #10 _TFC7RxSwift3Map3runuRd__S_12ObserverType_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ Map.swift:136 (RxSwift+0x0000001e4a0c)
    #11 _TFFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1ErFqd__PS_10Disposable_U_FT_PS3__ Producer.swift:28 (RxSwift+0x00000023f9c6)
    #12 _TPA__TFFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1ErFqd__PS_10Disposable_U_FT_PS3__ Producer.swift (RxSwift+0x00000023ffd9)
    #13 _TTRG__Rd__7RxSwift12ObserverTypexzwd__1ErXFo__iPS_10Disposable__XFo_iT__iPS2___ DelaySubscription.swift (RxSwift+0x0000001da5dc)
    #14 _TPA__TTRG__Rd__7RxSwift12ObserverTypexzwd__1ErXFo__iPS_10Disposable__XFo_iT__iPS2___ Producer.swift (RxSwift+0x0000002401a8)
    #15 _TFC7RxSwift22CurrentThreadScheduler8scheduleurfTx6actionFxPS_10Disposable__PS1__ CurrentThreadScheduler.swift:64 (RxSwift+0x00000013a366)
    #16 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:32 (RxSwift+0x00000023f6fc)
    #17 _TFE7RxSwiftPS_14ObservableType13subscribeSafeuRd__S_12ObserverTypewx1Ezwd__1Erfqd__PS_10Disposable_ ObservableType+Extensions.swift:117 (RxSwift+0x0000000c22f9)
    #18 _TFE7RxSwiftPS_14ObservableType9subscribefFGOS_5Eventwx1E_T_PS_10Disposable_ ObservableType+Extensions.swift:23 (RxSwift+0x0000000c1c66)
    #19 <redacted>
    #20 <redacted>
    #21 __invoking___ <null>:182 (CoreFoundation+0x00000007d44b)
    #22 start <null>:145 (libdyld.dylib+0x00000000468c)

  Mutex M9621 (0x7d1c0002f9c0) created at:
    #0 pthread_mutex_init <null>:255 (libclang_rt.tsan_iossim_dynamic.dylib+0x000000024a93)
    #1 -[NSRecursiveLock init] <null>:174 (Foundation+0x00000000231d)
    #2 _TFCSo15NSRecursiveLockCfT_S_ Debunce.swift (RxSwift+0x000000024231)
    #3 _TFC7RxSwift9MergeSinkcfT8observerq0_6cancelPS_10Cancelable__GS0_xq_q0__ Merge.swift:279 (RxSwift+0x000000156588)
    #4 _TFC7RxSwift11FlatMapSinkcfT8selectorFzxq_8observerq0_6cancelPS_10Cancelable__GS0_xq_q0__ Merge.swift:183 (RxSwift+0x0000001594ac)
    #5 _TFC7RxSwift11FlatMapSinkCfT8selectorFzxq_8observerq0_6cancelPS_10Cancelable__GS0_xq_q0__ Merge.swift (RxSwift+0x0000001596f5)
    #6 _TFC7RxSwift7FlatMap3runuRd__S_12ObserverTypew_1Ezwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS5___ Merge.swift:366 (RxSwift+0x00000015e3f6)
    #7 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #8 _TFC7RxSwift3Map3runuRd__S_12ObserverType_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ Map.swift:136 (RxSwift+0x0000001e4a0c)
    #9 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #10 _TFC7RxSwift19CombineLatestSink2_3runfT_PS_10Disposable_ CombineLatest+arity.swift:57 (RxSwift+0x000000168a44)
    #11 _TFC7RxSwift14CombineLatest23runuRd__S_12ObserverType0_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ CombineLatest+arity.swift:87 (RxSwift+0x00000016b552)
    #12 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:20 (RxSwift+0x00000023ef9b)
    #13 _TFC7RxSwift3Map3runuRd__S_12ObserverType_zwd__1ErfTqd__6cancelPS_10Cancelable__T4sinkPS_10Disposable_12subscriptionPS4___ Map.swift:136 (RxSwift+0x0000001e4a0c)
    #14 _TFFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1ErFqd__PS_10Disposable_U_FT_PS3__ Producer.swift:28 (RxSwift+0x00000023f9c6)
    #15 _TPA__TFFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1ErFqd__PS_10Disposable_U_FT_PS3__ Producer.swift (RxSwift+0x00000023ffd9)
    #16 _TTRG__Rd__7RxSwift12ObserverTypexzwd__1ErXFo__iPS_10Disposable__XFo_iT__iPS2___ DelaySubscription.swift (RxSwift+0x0000001da5dc)
    #17 _TPA__TTRG__Rd__7RxSwift12ObserverTypexzwd__1ErXFo__iPS_10Disposable__XFo_iT__iPS2___ Producer.swift (RxSwift+0x0000002401a8)
    #18 _TFC7RxSwift22CurrentThreadScheduler8scheduleurfTx6actionFxPS_10Disposable__PS1__ CurrentThreadScheduler.swift:64 (RxSwift+0x00000013a366)
    #19 _TFC7RxSwift8Producer9subscribeuRd__S_12ObserverTypexzwd__1Erfqd__PS_10Disposable_ Producer.swift:32 (RxSwift+0x00000023f6fc)
    #20 _TFE7RxSwiftPS_14ObservableType13subscribeSafeuRd__S_12ObserverTypewx1Ezwd__1Erfqd__PS_10Disposable_ ObservableType+Extensions.swift:117 (RxSwift+0x0000000c22f9)
    #21 _TFE7RxSwiftPS_14ObservableType9subscribefFGOS_5Eventwx1E_T_PS_10Disposable_ ObservableType+Extensions.swift:23 (RxSwift+0x0000000c1c66)
    #22 <redacted>
    #23 <redacted>
    #24 __invoking___ <null>:182 (CoreFoundation+0x00000007d44b)
    #25 start <null>:145 (libdyld.dylib+0x00000000468c)

  Thread T2 (tid=884782, running) created by thread T-1
    [failed to restore the stack]

SUMMARY: ThreadSanitizer: data race Merge.swift:262 in _TFC7RxSwift13MergeSinkIter2onfGOS_5Eventw_1E_T_

Self contained code example that reproduces the issue:

Can't provide code. Though I'm using RxAlamofire and configured the responseResult to return on a different queue than the calling code, so starting there might help if reproduction is needed. Hopefully the report above is enough though.

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

Using RxSwift 3.1.0.

Platform/Environment

  • [x] iOS
  • [ ] macOS
  • [ ] tvOS
  • [ ] watchOS
  • [ ] playgrounds

Xcode version:

Version 8.2 (8C38)

Installation method:

  • [ ] CocoaPods
  • [ ] Carthage
  • [x] Git submodules

I have multiple versions of Xcode installed:
(so we can know if this is a potential cause of your issue)

  • [ ] yes (which ones)
  • [x] no

Level of RxSwift knowledge:
(this is so we can understand your level of knowledge
and formulate the response in an appropriate manner)

  • [ ] just starting
  • [ ] I have a small code base
  • [x] I have a significant code base

Most helpful comment

Hi @brentleyjones ,

If TSAN notices it, doesn't it mean that a read and write can happen at the same time,

I would assume yes.

thus giving back garbage data?

Depends on what you mean by garbage data. In this case I would say no.

Let me try to elaborate.

// inner sequence 
case .completed:
            _parent._group.remove(for: _disposeKey)
            // If this has returned true that means that `Completed` should be sent.
            // In case there is a race who will sent first completed,
            // lock will sort it out. When first Completed message is sent
            // it will set observer to nil, and thus prevent further complete messages
            // to be sent, and thus preserving the sequence grammar.
            if _parent._stopped && _parent._group.count == MergeNoIterators {
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
                    _parent.forwardOn(.completed)
                    _parent.dispose()
                // }
            }
// outer sequence
case .completed:
            _lock.lock(); defer { _lock.unlock() } // lock {
                _stopped = true
                if _group.count == MergeNoIterators {
                    forwardOn(.completed)
                    dispose()
                }
                else {
                    _sourceSubscription.dispose()
                }
            //}

The goal is to prove that completed will be sent exactly once in all cases.

Trivial facts (I'm not going to prove those):

  • _stopped will be toggled exactly once between false and true.

Lets first prove that forward events will be sent at least once.

1rst case: inner sequence returns at least once _parent._stopped = true

  • if _parent._group.count == MergeNoIterators returns true that means that there are no more active inner sequences.
  • If it returns false that means that there are still some active inner sequences. Since _stopped currently returned true and since no code exists that toggles _stopped to false, that means that _stopped will be true for all inner sequences that are currently active.
    Because _compositeDisposable serializes removal of subscriptions, we can say for sure that for last inner sequence that removes it's disposable _parent._group.count == MergeNoIterators will be true for sure. Because of non atomic removal and querying _parent._stopped && _parent._group.count == MergeNoIterators, it is possible that it will be true for multiple inner sequences, but that's ok for now, since we want to first prove it will be called at least once.

2nd case: inner sequence always returns _parent._stopped = false
If that is the case, that means that for all inner sequence _parent._group.remove(for: _disposeKey) executed before _stopped = true executed. You can prove that by assuming the opposite and get a contradiction. _parent._group.remove(for: _disposeKey) can be viewed as sending a message "I'm done" to outer sequence. That also means that last inner sequence sent that message successfully to outer sequence before it got to assignment line stopped = true. Also, proof by contradiction, not all would then read stopped = false.
if _group.count == MergeNoIterators { will be satisfied, and it has to because all inner ones have completed before assigning stopped = true, forwardOn(.completed) will be sent at least once in that case also.

Now we need to prove that it will be sent at most once.

All sending of .completed event will be executed under lock.

// inner
 _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
                    _parent.forwardOn(.completed)
                    _parent.dispose()
                // }
// outer
_lock.lock(); defer { _lock.unlock() } // lock {
                _stopped = true
                if _group.count == MergeNoIterators {
                    forwardOn(.completed)
                    dispose()
                }

That means that those executions will be serialized and there will be at most once thread at a time executing

forwardOn(.completed)
dispose()

The first one that executes that pair will send the .completed event and perform dispose to avoid sending any further completed event in future.

That completes the proof it will be sent at most once.

I can understand why TSAN says we have a data race here, because we do. I'm trying to explain why it shouldn't matter in this particular case.

All 9 comments

Hi @brentleyjones ,

Thnx for reaching out.

Did you maybe read the comments directly above Merge.swift:262?

This is something that would pop up in ThreadSanitizer as data race but I don't think it should be.

It's an optimization that enables early returning and avoiding performing unnecessary locking for completed event in most common case for flatMap IMHO.

e.g.

   xs.flatMap { x in Observable.from(x) }

Having said that, I would like to add ThreadSanitizer to unit tests. I think we might have a couple of situations like this one. They aren't unsafe but would be reported by ThreadSanitizer.

We could create conditional compilation code to mark those places more explicitly when thread sanitizer is running.

If TSAN notices it, doesn't it mean that a read and write can happen at the same time, thus giving back garbage data? I read the comment, this has nothing to do with the the race on .completed, but on the instance variable _stopped. Since it is read outside of a lock it means it can be read while it's being written to. I would not be a fan of conditional compilation since production code should be clean in TSAN, since it doesn't have false positives.

The new TSan (that is now in cland and gcc) works only in
happens-before mode. And this mode does not have false positives.

Source: https://groups.google.com/forum/#!topic/thread-sanitizer/UuOzVuYWUA4

Hi @brentleyjones ,

If TSAN notices it, doesn't it mean that a read and write can happen at the same time,

I would assume yes.

thus giving back garbage data?

Depends on what you mean by garbage data. In this case I would say no.

Let me try to elaborate.

// inner sequence 
case .completed:
            _parent._group.remove(for: _disposeKey)
            // If this has returned true that means that `Completed` should be sent.
            // In case there is a race who will sent first completed,
            // lock will sort it out. When first Completed message is sent
            // it will set observer to nil, and thus prevent further complete messages
            // to be sent, and thus preserving the sequence grammar.
            if _parent._stopped && _parent._group.count == MergeNoIterators {
                _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
                    _parent.forwardOn(.completed)
                    _parent.dispose()
                // }
            }
// outer sequence
case .completed:
            _lock.lock(); defer { _lock.unlock() } // lock {
                _stopped = true
                if _group.count == MergeNoIterators {
                    forwardOn(.completed)
                    dispose()
                }
                else {
                    _sourceSubscription.dispose()
                }
            //}

The goal is to prove that completed will be sent exactly once in all cases.

Trivial facts (I'm not going to prove those):

  • _stopped will be toggled exactly once between false and true.

Lets first prove that forward events will be sent at least once.

1rst case: inner sequence returns at least once _parent._stopped = true

  • if _parent._group.count == MergeNoIterators returns true that means that there are no more active inner sequences.
  • If it returns false that means that there are still some active inner sequences. Since _stopped currently returned true and since no code exists that toggles _stopped to false, that means that _stopped will be true for all inner sequences that are currently active.
    Because _compositeDisposable serializes removal of subscriptions, we can say for sure that for last inner sequence that removes it's disposable _parent._group.count == MergeNoIterators will be true for sure. Because of non atomic removal and querying _parent._stopped && _parent._group.count == MergeNoIterators, it is possible that it will be true for multiple inner sequences, but that's ok for now, since we want to first prove it will be called at least once.

2nd case: inner sequence always returns _parent._stopped = false
If that is the case, that means that for all inner sequence _parent._group.remove(for: _disposeKey) executed before _stopped = true executed. You can prove that by assuming the opposite and get a contradiction. _parent._group.remove(for: _disposeKey) can be viewed as sending a message "I'm done" to outer sequence. That also means that last inner sequence sent that message successfully to outer sequence before it got to assignment line stopped = true. Also, proof by contradiction, not all would then read stopped = false.
if _group.count == MergeNoIterators { will be satisfied, and it has to because all inner ones have completed before assigning stopped = true, forwardOn(.completed) will be sent at least once in that case also.

Now we need to prove that it will be sent at most once.

All sending of .completed event will be executed under lock.

// inner
 _parent._lock.lock(); defer { _parent._lock.unlock() } // lock {
                    _parent.forwardOn(.completed)
                    _parent.dispose()
                // }
// outer
_lock.lock(); defer { _lock.unlock() } // lock {
                _stopped = true
                if _group.count == MergeNoIterators {
                    forwardOn(.completed)
                    dispose()
                }

That means that those executions will be serialized and there will be at most once thread at a time executing

forwardOn(.completed)
dispose()

The first one that executes that pair will send the .completed event and perform dispose to avoid sending any further completed event in future.

That completes the proof it will be sent at most once.

I can understand why TSAN says we have a data race here, because we do. I'm trying to explain why it shouldn't matter in this particular case.

Rx.NET team also has the same optimizations, otherwise I wouldn't be so calm about these kinds of proofs.

I guess I don't know what would be returned in Swift when reading from a bool variable while it's being written to. I hope it can only be true or false, but maybe its a segmentation fault? I know for a counter variable these arguments wouldn't hold, because it may be 5 being set to 6, but if you read it while it's being written to you might get 1000 (corrupt data).

Anyway, TSAN is crashing in our unit tests over this. I would rather not have to write an exception for RxSwift code. Thank you for taking the time to reply.

Hi @brentleyjones ,

If we assume this was an int and not a bool, it wouldn't matter if one would be able to read 5, 1000, 6 :) The algorithm would still work the same.

The only issue would be if Swift would somehow validate the boolean that it reads from memory, and if it didn't contain one of two expected values crash the program. I'm assuming that would cause significant performance penalty for Swift runtime and would already crash someones program (or mine), so I doubt it does that.

The algorithm only relies on the assumption that effects of these lines aren't reordered

_stopped = true
if _group.count == MergeNoIterators {

and

_parent._group.remove(for: _disposeKey)
if _parent._stopped && _parent._group.count == MergeNoIterators {

I've seen Swift compiler do some weird things, and it doesn't have volatile keyword, but I don't see a reason why it would reorder _stopped = true and put it after if _group.count == MergeNoIterators.

As far as for CPU reordering instructions, inside _parent._group.remove(for: _disposeKey) and _group.count we unfortunately do need to call mutex->lock, mutex->unlock, but at least that should have a positive effect of creating full memory barriers. That gives me reason to believe that CPU isn't messing with reordering the effects of these instructions.

http://stackoverflow.com/questions/1581718/does-interlocked-compareexchange-use-a-memory-barrier

I'll try to investigate this and see can we do something simple to stop this from appearing in thread sanitizer. Right now I don't have enough information on how exactly thread sanitizer works.

Hi @brentleyjones ,

I've read the paper about ThreadSanitizer http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.308.3963&rep=rep1&type=pdf (hoping this is the correct one), studied your link https://groups.google.com/forum/#!topic/thread-sanitizer/UuOzVuYWUA4 and tried to figure out how does this relate to our code base.

After I've done that I've checked out WWDC video on ThreadSanitizer and it looks like I've read that paper in vane since it's based on vector clocks, and not according to that paper :(

Fun times.

I think that only realistic short term approach would be to introduce conditional compilation statements that would be thread sanitizer friendly.

That would serve two purposes, document those special parts, and enable us to easily test performance regressions of those parts.

I think there are about dozen places where we use tricks with atomics or other thread sanitizer unfriendly code on really hot paths for performance reasons. As a general policy we are using recursive mutexes for synchronization so it shouldn't be so bad.

This is also related with additional performance optimizations I have on my plate.

I just updated a project from Swift 2.2 to Swift 3, so I also had to update RxSwift. I replaced every myArray.toObservable() by Observable.from(myArray) (some of them inside a flatMap), and it was broken. I tried to debug with an array of 15 elements on which a mapwas applied, then other operators. Only the first element enters the map, and then the debugger goes crazy, every other operators are skipped.

I'm not sure this is completely related to this issue because it is too technical for me, but I thought you should know. Using myArray.forEach { observer.onNext($0) } makes it work correctly.

I believe this should be fixed now. Please let me know if it's not.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

marlowcharite picture marlowcharite  路  3Comments

hannesstruss picture hannesstruss  路  3Comments

delebedev picture delebedev  路  3Comments

retsohuang picture retsohuang  路  3Comments

jaumard picture jaumard  路  3Comments