I just migrated my project to Realm 4.3.0 from 3.7.1 and am encountering a weird issue.
This code works as expected, but is using my own hacky way of returning a query as a Single so I'd like to move away from it:
return Single.create { emitter ->
val realm = RealmDatabase.getRealm()
var result : KOptional<User> = KOptional.EMPTY
realm.executeTransactionAsync({
val entity = it.where(UserEntity::class.java)
.equalTo(UserFields.USER_NAME, username, Case.INSENSITIVE)
.findFirst()
entity?.let {
result = KOptional(UserMapper.toModel(it))
}
}, {
realm.close()
emitter.onSuccess(result)
}, {
realm.close()
emitter.onError(it)
})
}
However, using the new API + RxJava is giving me an exception which is enumerated in full detail below this code snipppet:
val realm = RealmDatabase.getRealm()
return realm.where(UserEntity::class.java)
.equalTo(UserFields.USER_NAME, username, Case.INSENSITIVE)
.findFirstAsync()
.asFlowable<UserEntity>()
.filter { it.isLoaded }
.firstOrError()
.map {
if (it.isValid) {
val model = UserMapper.toModel(it)
KOptional(model)
} else {
KOptional.EMPTY
}.also {
realm.close()
}
}
StackTrace:
java.lang.IllegalStateException: Accessing object of type which has been invalidated or deleted
at io.realm.internal.OsObject.nativeStartListening(Native Method)
at io.realm.internal.OsObject.addListener(OsObject.java:118)
at io.realm.ProxyState.addChangeListener(ProxyState.java:141)
at io.realm.RealmObject.addChangeListener(RealmObject.java:497)
at io.realm.RealmObject.addChangeListener(RealmObject.java:540)
at io.realm.rx.RealmObservableFactory$14.subscribe(RealmObservableFactory.java:457)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableElementAtSingle.subscribeActual(FlowableElementAtSingle.java:41)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleZipArray.subscribeActual(SingleZipArray.java:64)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleFlatMapCompletable.subscribeActual(SingleFlatMapCompletable.java:44)
at io.reactivex.Completable.subscribe(Completable.java:1635)
at io.reactivex.internal.operators.completable.CompletablePeek.subscribeActual(CompletablePeek.java:51)
at io.reactivex.Completable.subscribe(Completable.java:1635)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65)
at io.reactivex.internal.operators.completable.CompletableObserveOn$ObserveOnCompletableObserver.run(CompletableObserveOn.java:90)
at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6119)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Realm version(s): 4.3.0
Realm sync feature enabled: no
Android Studio version: 3.0.1
Which Android version and device: 10.1 WXGA Emulator, API 25, 7.1.1
Oh boy, so it says that it tries to internally call addChangeListener on the object even after it was deleted and therefore invalidated.
I think this might be a mistake in the default RealmObservableFactory, on this line. @cmelchior
It should check if the item is valid, and if it is invalid, emit it once more, then complete the stream, to properly handle findFirstAsync() behavior.
Personally I recommend
return Realm.getDefaultInstance().use { realm ->
realm.where(UserEntity::class.java)
.equalTo(UserFields.USER_NAME, username, Case.INSENSITIVE)
.findAllAsync() // <-- get RealmResults
.asFlowable<UserEntity>()
.filter { it.isLoaded }
.map { users ->
val user = users.first(null)
if(user == null) KOptional.empty else KOptional(UserMapper.toModel(user))
}
}
or something like that.
I don't think I've ever found a use-case that works well with findFirstAsync behavior.
Odd, indeed. I might just keep my current implementation then. Having to use findAllAsync just seems wrong to me.
@unosmiles Not really, if you want to listen for a Flowable instead of a Single, then I definitely recommend findAll* over findFirst*, because RealmResults never terminates, but RealmObject does terminate after deletion (or at least it should, but doesn't, hence your crash).
It do appear we have a bug in the API as https://github.com/realm/realm-java/blob/master/realm/realm-library/src/main/java/io/realm/RealmObject.java#L667 just parses on the object which might be deleted.
I'm not sure what the correct semantics would be there though? Throw an exception? Call onError or Onsuccess immediately? Normally we throw immediately if a RealmObject is accessed in a wrong state, but maybe deletedObj.asFlowable() should go call onError instead.
That aside, the original code in this issue should not fail this way as a findFirstAsync() should never return a deleted object.
@cmelchior check if the object is valid or not before adding the change listener. If it's invalid, then emit the object (if emitter is not disposed), then call emitter.onCompleted() as well.
That would be synonymous to the current change listener behavior.
Although I do admit, an object returned by findFirstAsync() should not be invalid. Unless maybe the object was not found?
Although I do admit, an object returned by findFirstAsync() should not be invalid. Unless maybe the object was not found?
In my case, I can guarantee the object is there. If I hit a breakpoint right before my method is called and do a blocking query with findFirst, I receive a valid object.
If it's a pending row, then it shouldn't fail... there are integration tests for it and all that. :confused:
This kinda makes me secretly hope for that one day, findFirstAsync would be completely removed, and it'd be recommended to use findAllAsync() and .first(null) instead.
@unosmiles There might be timing issues with testing that way, but in any case, the current behavior should definitely be fixed.
@Zhuinden The current behavior doesn't complete the stream when an invalid object is emitted, but it probably should since no more objects will be sent in any case. I would be a bit hesitant to make that change in a patch release, but would probably be fine in a minor release (even though it is technically breaking current semantics).
I do agree with you, however, that the current findFirst() is too confusing and I would like to switch it to RealmOptional<Foo> obj = query.findFirst() at some point: #5179
@unosmiles I'm not able to reproduce this behavior. Clearly, there is an issue somewhere, but the simple test like this doesn't reproduce it:
@Test
@RunTestInLooperThread
public void findFirstAsync_supportInvalidObjects() {
final Realm realm = looperThread.getRealm();
realm.beginTransaction();
AllTypes obj = realm.createObject(AllTypes.class);
obj.deleteFromRealm();
realm.commitTransaction();
subscription = obj.<AllTypes>asFlowable()
.subscribe(new Consumer<AllTypes>() {
@Override
public void accept(AllTypes obj) throws Exception {
assertFalse(obj.isValid());
looperThread.testComplete();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
fail(throwable.toString());
}
});
}
Nor do any other variants I have tried. Can you create a sample project that reproduces the behavior?
I also tried with your example code here:
val realm = RealmDatabase.getRealm()
return realm.where(UserEntity::class.java)
.equalTo(UserFields.USER_NAME, username, Case.INSENSITIVE)
.findFirstAsync()
.asFlowable<UserEntity>()
.filter { it.isLoaded }
.firstOrError()
.map {
if (it.isValid) {
val model = UserMapper.toModel(it)
KOptional(model)
} else {
KOptional.EMPTY
}.also {
realm.close()
}
}
But could not get it to crash either.
After further testing, I think this is more a timing issue.
I'm using RxJava + Realm to complete this flow:
1) Completable: Clear all Users from Realm.
2) Completeable: Persist a new collection of Users to Realm.
3) Single<KOptional<User>>: Fetch a User with a specified userId.
It must be some sort of timing issue, maybe to do with an opened/ not yet closed Realm, because this doesn't always crash, as you are experiecing, @cmelchior. I haven't seen this issue crop up
using 4.3.1 often enough to devote any more time to it so you can mark this as closed if you'd like. Thanks for your help.
Please re-open if you encounter this again.