Realm-cocoa: Issue with updates when changes happen on background thread are started before subscription

Created on 23 Mar 2017  路  13Comments  路  Source: realm/realm-cocoa

Goals

I am trying to subscribe using RealmSwift + RxRealm to changes to a collection with a simple query, I need to be able to get the current results and also any changes that happen later.

Expected Results

I expect that creating observable query should get the current state of the data base and new changes.

Actual Results

Currently this works as long as initial changes to objects in database either happen on same thread OR the background updates to changes are started AFTER the subscription has happened.

Steps to Reproduce

  • Create a simple query returning the array of objects as an observable.

  • Kick off a background update to an objet in the database

  • subscribe to the changes to that query

  • Observe you only get the state of the object before the background update, the changes made by the background update are never received.

** NOTE: If you kick off the background update AFTER the subscription happened this works fine.

** NOTE2: In my app it is not possible to know if there is already an update to the database happening before subscriptions happen , so it is not feasible to always subscribe first, subscriptions happen at any time during the course of the app for UI updates.

Code Sample

func testRXRealm() {

        let waitForStart = self.expectation(description: "We should get a start event for our operation")

        do {

            guard let accountId = self.accountOperation?.account?.id else {

                XCTFail("No account op")

                return
            }

            let realm = try Realm()

            let operations = realm.objects(AccountOperation.self).filter("account.id = %@", accountId)

            let obsOperations = Observable.array(from: operations).share()

            guard let accountOp = self.accountOperation?.detached() else {

                XCTFail("We should get accountOP")

                return
            }


            realm.beginWrite()

            accountOp.taskState = .Executing

            realm.add(accountOp, update: true)

            try realm.commitWrite()


            guard let accountOp2 = self.accountOperation?.detached() else {

                XCTFail("We should get accountOP")

                return
            }


            self.queryQueue.async {

                let bgRealm = try! Realm()

                bgRealm.beginWrite()

                accountOp2.taskState = .Finished

                bgRealm.add(accountOp2, update: true)

                try! bgRealm.commitWrite()
            }


            obsOperations.subscribe(onNext: { (operations) in

                XCTAssertGreaterThan(operations.count, 0, "There should be more than 0 operations in the database")

                for op in operations {

                    switch op.taskState {

                    case .Executing:

                        print("Executing")


                    case .Finished:

                        print("Finished")

                        waitForStart.fulfill()


                    case .Init:

                        print("Init")

                    default:

                        break
                    }
                }

            }, onError: { (error) in

                print("error: \(error.localizedDescription)")

            }, onCompleted: { 

                print("complete")

            }, onDisposed: nil).addDisposableTo(self.disposeBag)




        }
        catch let error {

            error.logError()

            XCTFail("\(error.localizedDescription)")
        }



        self.waitForExpectations(timeout: 5) { (error) in

            if let error = error {

                error.logError()

            }
        }
    }

Version of Realm and Tooling

ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32

/Applications/Xcode.app/Contents/Developer
Xcode 8.2.1
Build version 8C1002

/usr/local/bin/pod
1.2.0
(not in use here)

/bin/bash
GNU bash, version 3.2.57(1)-release (x86_64-apple-darwin16)

/usr/local/bin/carthage
0.20.1
(not in use here)

/usr/bin/git
git version 2.10.1 (Apple Git-78)

Realm version:

github "RxSwiftCommunity/RxRealm" "0.5.2"
github "realm/realm-cocoa" "v2.4.4"

T-Help

All 13 comments

Hi @grangej. Thanks for reaching out about this. I wanted to let you know that we've seen your issue and that someone will be following-up with you soon. While you wait, would you be able to provide us with exactly which Realm version you're using (and not just "Latest") since sometimes we send out updates in between response times. Thanks!

@istx25 Thank you for following up.

The versions that we are using here are:

github "RxSwiftCommunity/RxRealm" "0.5.2"
github "realm/realm-cocoa" "v2.4.4"

We have ran some additional tests this morning and have some additional findings:

If you create two background threads even AFTER the subscription only one of them seems to emit changes.

For example, the following code only one of the changes will ever be observed.

// subscribe here to observable

self.queryQueue.async {

                let bgRealm = try! Realm()

                bgRealm.beginWrite()

                accountOp.taskState = .Executing

                bgRealm.add(accountOp, update: true)

                try! bgRealm.commitWrite()

            }

            self.queryQueue.async {

                let bgRealm = try! Realm()

                bgRealm.beginWrite()

                accountOp.taskState = .Finished

                bgRealm.add(accountOp, update: true)

                try! bgRealm.commitWrite()

            }

If we change the code to the following though both changes will be seen:

            self.queryQueue.async {

                let bgRealm = try! Realm()

                bgRealm.beginWrite()

                accountOp.taskState = .Executing

                bgRealm.add(accountOp, update: true)

                try! bgRealm.commitWrite()

                bgRealm.beginWrite()

                accountOp2.taskState = .Finished

                bgRealm.add(accountOp2, update: true)

                try! bgRealm.commitWrite()
            }

Hi @grangej! I can't help but feel like this might be an issue more on RxRealm's side of things than Realm Swift directly. Have you considered filing an issue on the RxRealm repo?

@TimOliver Let me test with straight realm code and I will let you know.

@TimOliver You are correct I tried the following code: and it works , but looking at the RxRealm code for changeset it makes no sense why it would not work. I stepped through the code in RxRealm and the observer is never receiving one of the notifications in the notification block. So I still think this is a RealmSwift issue since the RxRealm code block never receives one of the update notifications for some reason. Thread issue? Race condition? Something funny here.

`
// This does not work, I end up losing one of the notifications (in the middle in this case)

let changeNote = Observable.changeset(from: operations, synchronousStart: false).share()
changeNote.subscribe(onNext: { (objects, changes) in

            print(objects.first?.taskState)

            if changes != nil {

                print("Updates received ")


                count = count + 1
            }



        }, onError: { (error) in

            print("error: \(error.localizedDescription)")

        }, onCompleted: {

            print("complete")

        }, onDisposed: {

            print("Disposed")

        }).addDisposableTo(self.disposeBag)`

// for reference the relevant RxRealm code

public static func collection(from collection: E, synchronousStart: Bool = true) -> Observable<E> {
    return Observable.create {observer in
        if synchronousStart {
            observer.onNext(collection)
        }

        let token = collection.addNotificationBlock {changeset in

         // THIS Block never gets called for some updates when I set breakpoint in here. 

            let value: E

            switch changeset {
                case .initial(let latestValue):
                    guard !synchronousStart else { return }
                    value = latestValue

                case .update(let latestValue, _, _, _):
                    value = latestValue

                case .error(let error):
                    observer.onError(error)
                    return
            }

            observer.onNext(value)
        }

        return Disposables.create {
            token.stop()
            observer.onCompleted()
        }
    }
}

`

// this does work, but I suspect that it may be simply a timing issue so it may not /always/ work.
self.realmToken = operations.addNotificationBlock({ (changes) in

            switch changes {

            case .update(let objects, deletions: let deletions, insertions: let insertions, modifications: let modifications):

                count = count + 1

                print("Realm Update Notification")

                print("Modifications: \(modifications)")
                print("Deletions: \(deletions)")
                print("insertions: \(insertions)")

                print("Updated Objects: \(objects)")

                for object in objects {

                    if !fulfilled {

                        print("Object TaskState: \(object.taskState)")

                        if object.taskState == .Finished {

                            fulfilled = true
                            waitForStart.fulfill()
                        }

                    }

                }

            case .initial(let objects):

                print("Realm Initial Notification")

                print(objects)

            case .error(let error):

                print("Error: \(error.localizedDescription)")


            }


        })

`

I can confirm the reported issue is reproducible, will try to find out why does it happen.

If you create the observable asynchronously you will receive the background thread changes like so:

DispatchQueue.global(qos: .background).async {
    let realm = try! Realm()
    let obj = realm.objects(Test.self).first!

    try! realm.write {
        obj.name = "Test0"
    }
}

Observable.array(from: tests, synchronousStart: false)
    .subscribe(onNext: { value in
        print("rx next: \(flatten(value))")
    })
    .addDisposableTo(bag)

By default the Observable will emit its first element synchronously (to prevent delays when binding UI controls) since Realm's initial update is emitted asynchronously - there might be a gap between the two, which gets swollen by the code in RxRealm.

Thanks for catching this, will address it in the RxRealm code

@icanzilb Not sure what is different from your background queue than mine? self.queryQueue is simply a named DispatchQueue.

the difference is in the value of the synchronousStart parameter, it's by default true and above I'm using false. Using the default true in combination with a racing write is what triggers the issue you found.

@icanzilb hmmmm I did try that as well on my end, I will go back and double check my tests.

please do. I followed the "steps to reproduce" included in your issue, I reproduced the described problem and found where the issue was in RxRealm's code (and using synchronousStart=false solves it for me). In case you still have an issue - could you update the steps to reproduce or attach a simple project, which shows what the other issue is?

@icanzilb no problem I will try and get this back to you by tomorrow. Thanks for investigating.

Just catching up on some of the older issues now. :)

Since the cause of this issue is being tracked in the RxRealm repo, I'm going to close this one. If you identify anything that is a bug in Realm Cocoa itself, please feel free to open a new issue. :)

Thanks!

Was this page helpful?
0 / 5 - 0 ratings