Realm-java: RxJava 2 support

Created on 27 Sep 2016  路  40Comments  路  Source: realm/realm-java

Goal

Use RxJava 2.0 with realm.io, since it will be released soon

Expected Results

Add support for the RxJava 2 same as it was added for the RxJava 1

Design-Required T-Feature

Most helpful comment

@pavel-ismailov RxJava support is being merged into this feature branch currently https://github.com/realm/realm-java/pull/4991

We are done with moving the API's to RxJava2 + add support for changeset observables as well. It will be released with Realm Java 4.0 which currently is scheduled for release in a months time.

All 40 comments

Hi @tonyshkurenko
I haven't dug that deep into RxJava 2 yet, but Realm uses very little of the RxJava 1.* API's, so I would actually expect that we would automatically be compatible with RxJava 2.

I haven't tested it yet though. Are you thinking of something in particular with regards to RxJava2 ?

Hey! Thanks for the answer!

I have a pet project with realm and rxjava1 where I test everything, I wanted to refactor it for the rxjava2, to understand it. Currently, I don't have any info about it

Ok. Testing it should be easy though.
Just add io.reactivex:rxjava:2.0.0-RC3 a dependency to your project and try to:

Realm realm = Realm.getDefaultInstance();
realm.where(Foo.class).findAllAsync().asObservable().subscribe(...);

RxJava 2 Observables live under io.reactivex.Observable while RxJava 1 Observables live under rx.Observable. This is a big problem for us as it not only breaks our RxObservableFactory interface, but also means we cannot have one method asObservable() on RealmResults/Realm/RealmObject that can support both RxJava 1 and 2.

Looks like we have to rethink how to support RxJava going forward.

For reference this was the old design document: https://github.com/realm/realm-java/pull/1651

The primary reason for going with the current approach was that at the time RxJava and RxJava2 were expected to be interface compatible, at least for the Observable class which is the only public class we expose.

Unfortunately this turned out to not be the case ( http://jakewharton.com/java-interoperability-policy-for-major-version-updates/ ), which while good for app developers, are very breaking for libraries like Realm that expose types from these libraries.

This leaves us with two choices IMO:

1) Keep the current approach but add a new asObservable2() or similar (bikeshed the name all you want)

realm.where(Person.class).findAll().asObservable();
realm.where(Person.class).findAll().asObservable2();

2) Go back to the original RxRealm.asObservable(obj) proposal: https://github.com/realm/realm-java/pull/1651/files#diff-ea7c3e46ce5758860592a8c6d0dc678dR63

RxRealm.asObservable(realm.where(Person.class).findAll());
RxRealm.asFlowable(realm.where(Person.class).findAll());

I'm slightly leaning towards 2 right now, while the less fluent style will be annoying, it provides a lot of benefits:

  • We can add it without removing our current support for RxJava1
  • We are resilient to further breaking changes in RxJava
  • We can provide implementations for both RxJava1 and RxJava2 this way as separate dependencies.
  • Allow us to expose newer / more advanced types without being afraid of backwards compatibility, e.g. Single, Flowable, Maybe, Completable

Other references:

Thoughts @realm/java ?

Since the model interface already means you need to pass the query or object to a static helper. I would vote for option 2

I like realm.where(Person.class).findAll().asObservable2(); because it's what StorIO does also

But I also think Realm doesn't really do backpressure, does it? So no point in Flowable

Is there any progress on this @cmelchior? If you decide to go with asObservable2(), would you accept a PR?

Not any progress right now, sorry. I don't think we will be going with asObservable2() though. Consensus seems to be going drifting towards splitting support into a helper RxRealm class. This will also allow us to support more observable types like asObservable()/asFlowable()/asCompletable()

@cmelchior I'm not sure why you need any type beyond Observable, though.

Although maybe I'm just not looking past the box of Rx1 support.

RxJava2 handles backpressure differently. Flowable is if you want backpressure support, Observable if not. Completable is useful for transactions since it can only either succeed or fail

But transactions are handled on Realm and not on RealmResults. You could just have executeTransactionAsCompletable() or something.

That name sounds bloaty. :slightly_frowning_face:

I can imagine RxRealm.executeTransaction(realm, Action<Realm>) though.

@patloew This issue is about RxJava2 support across the board, not just on RealmResults. The problem with supporting multiple different types is that:

1) It will bloat the API surface
2) Some types are not available on all versions of RxJava

Unfortunately splitting into a separate class reduces the fluent nature of the current API.

One idea could perhaps be to utilize this #3869 which would enable us to make Rx subclasses of all our current ones. This would be slightly more work, but couldn't work (I haven't thought it entirely through yet though).

@Zhuinden Not sure what the Action is?

I created this gist to quickly get RxJava 2 Observables from RealmResults, until official RxJava 2 support is shipping. This small class might be useful, if you want to migrate already and don't want two additional dependencies (RxJava 1 and Interop libs).

Another +1 for the non-fluent interface RxRealm.asObservable(. Besides the reasons pointed out by @cmelchior already (supporting other variants such asFlowable etc.), it potentially could also divorce the dependencies on RxJava

So folks who don't want to use RxJava don't have to have the import. For the folks who do want to use RxJava, it allows flexibility in quickly upgrading and keeping up with the changes of RxJava.

At this point, has the above question (of which way to go fluent/wrapper) been answered? That answer is probably a pre-requisite to the necessary action items

Any update on this?

@nicolausYes Not yet. It is very likely that we will go with the non-fluent variant though, which means that you could easily implement this today by copying https://github.com/realm/realm-java/blob/master/realm/realm-library/src/main/java/io/realm/rx/RealmObservableFactory.java, update it to RxJava2 and turn the methods into public static methods.

for the lazy ones:

``` java
private io.reactivex.Flowable getRealm() {
return io.reactivex.Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter)
throws Exception {
RealmConfiguration realmConfiguration = realm.getConfiguration();
Realm observableRealm = Realm.getInstance(realmConfiguration);

            final RealmChangeListener<Realm> listener = _realm -> {
                emitter.onNext(_realm);
            };
            emitter.setDisposable(Disposables.fromRunnable(() -> {
                observableRealm.removeChangeListener(listener);
                observableRealm.close();
            }));
            observableRealm.addChangeListener(listener);
            emitter.onNext(observableRealm);
        }
    }, BackpressureStrategy.LATEST)

```

Nice 馃憤. I haven't dug that deep into RxJava2, so cannot comment on the correctness 馃槃. Also it will probably work for Realm, but we had to add strong references to things like RealmResults to prevent them from being GC'ed, e.g https://github.com/realm/realm-java/blob/master/realm/realm-library/src/main/java/io/realm/rx/RealmObservableFactory.java#L137

@patloew thx man, you helped me a lot.
As Patrick gist about RealmResults.asObservable, I created this gist for RealmObject.asObservable usage with RxJava2 with example of migration 1->2

@mtychyna @patloew use your code , i got

java.lang.IllegalStateException: Realm access from incorrect thread. Realm instance can only be closed on the thread it was created.

how can i resolve the error

@Module
public class ApplicationModule {
 @Provides
    @ApplicationScope
    public Realm realm() { return Realm.getDefaultInstance();}
}
public class MainActivity {
    @Inject
    Realm realm;

   @Override
    protected void onStart() {
        super.onStart();
        Single.zip((new RealmService()).allArticles(realm),
                Single.timer(MINIMUM_LOADING_TIME, TimeUnit.MILLISECONDS),
                (obs, timer) -> Observable.empty())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
}



md5-bc4455b9dbda01402982ed378bc8e341




public class RealmService {
    public RealmService() {
    }
    public Single<List<Article>> allArticles(Realm realm) {
        return RealmResultsSingle.from(realm.where(Article.class).findAll())
                .map(result -> result);
    }
}

@eccentricyan well they never said you should send a UI-thread Realm instance to a background service. (Read here)

@Zhuinden thx. i resolve it by change Schedulers.io() to AndroidSchedulers.mainThread()

Using Realm (plugin v. 3.0.0 with Retrofit + SimpleXML parser + RxJava2 right now actually doesn't work -- I get an exception when the parser attempts to parse the RealmObject defined within my model...

io.reactivex.exceptions.UndeliverableException: java.lang.NoClassDefFoundError: rx.Observable

I thought originally it was SimpleXML doing some weird thing depending on RxJava 1.x, but in testing, the app only gives me that exception when I make my models extend RealmObject. I don't actually have to Initialize Realm or even attempt to create or persist to a Realm Database -- it's the same stacktrace solely when the Simple XML parser -- upon response from the server -- attempts to serialize/deserialze the RealmObject. Stacktrace isn't of much help. Been going crazy for hours trying to track it down.

Project level build.gradle:

buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath 'com.android.tools.build:gradle:2.3.0'
        classpath "io.realm:realm-gradle-plugin:3.0.0"
    }
}

app build.gradle:

apply plugin: 'com.android.application'
apply plugin: 'realm-android'

android {
    compileSdkVersion 25
    buildToolsVersion "25.0.2"
    defaultConfig {
        applicationId "com.someapp.xmlrx2test"
        minSdkVersion 17
        targetSdkVersion 25
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        debug {
            debuggable true
        }
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
}

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
        exclude group: 'com.android.support', module: 'support-annotations'
    })
    compile 'com.android.support:appcompat-v7:25.3.0'
    testCompile 'junit:junit:4.12'

    //Required Dagger (Dependency Injection) Dependencies
    compile 'com.google.dagger:dagger:2.10'
    annotationProcessor 'com.google.dagger:dagger-compiler:2.10'

    // Retrofit & okhttp
    compile('com.squareup.retrofit2:converter-simplexml:2.2.0') {
        exclude group: 'xpp3', module: 'xpp3'
        exclude group: 'stax', module: 'stax-api'
        exclude group: 'stax', module: 'stax'
    }
    compile('com.squareup.retrofit2:retrofit:2.2.0') {
        exclude module: 'okhttp'
    }
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
    compile 'com.squareup.okhttp3:okhttp:3.6.0'

    //Rx2
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // Because RxAndroid releases are few and far between, it is recommended you also
    // explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex.rxjava2:rxjava:2.0.7'
}

full stack here:

FATAL EXCEPTION: RxCachedThreadScheduler-1
                                                                         Process: com.someapp.xmlrx2test, PID: 28908
                                                                         io.reactivex.exceptions.UndeliverableException: java.lang.NoClassDefFoundError: rx.Observable
                                                                             at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
                                                                             at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:62)
                                                                             at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                                                             at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                             at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                                                             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                                                             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                                                             at java.lang.Thread.run(Thread.java:761)
                                                                          Caused by: java.lang.NoClassDefFoundError: rx.Observable
                                                                             at libcore.reflect.InternalNames.getClass(InternalNames.java:55)
                                                                             at java.lang.Class.getDexCacheType(Class.java:2551)
                                                                             at java.lang.reflect.Method.getReturnType(Method.java:150)
                                                                             at java.lang.Class.getDeclaredMethods(Class.java:1811)
                                                                             at org.simpleframework.xml.core.DetailScanner.methods(DetailScanner.java:408)
                                                                             at org.simpleframework.xml.core.DetailScanner.scan(DetailScanner.java:366)
                                                                             at org.simpleframework.xml.core.DetailScanner.<init>(DetailScanner.java:140)
                                                                             at org.simpleframework.xml.core.DetailExtractor.getDetail(DetailExtractor.java:107)
                                                                             at org.simpleframework.xml.core.Support.getDetail(Support.java:278)
                                                                             at org.simpleframework.xml.core.ClassScanner.scan(ClassScanner.java:279)
                                                                             at org.simpleframework.xml.core.ClassScanner.<init>(ClassScanner.java:112)
                                                                             at org.simpleframework.xml.core.ObjectScanner.<init>(ObjectScanner.java:78)
                                                                             at org.simpleframework.xml.core.ScannerFactory.getInstance(ScannerFactory.java:81)
                                                                             at org.simpleframework.xml.core.Support.getScanner(Support.java:357)
                                                                             at org.simpleframework.xml.core.Source.getScanner(Source.java:271)
                                                                             at org.simpleframework.xml.core.Source.getSchema(Source.java:317)
                                                                             at org.simpleframework.xml.core.Composite.read(Composite.java:198)
                                                                             at org.simpleframework.xml.core.Composite.read(Composite.java:148)
                                                                             at org.simpleframework.xml.core.Traverser.read(Traverser.java:92)
                                                                             at org.simpleframework.xml.core.Persister.read(Persister.java:625)
                                                                             at org.simpleframework.xml.core.Persister.read(Persister.java:606)
                                                                             at org.simpleframework.xml.core.Persister.read(Persister.java:584)
                                                                             at org.simpleframework.xml.core.Persister.read(Persister.java:562)
                                                                             at retrofit2.converter.simplexml.SimpleXmlResponseBodyConverter.convert(SimpleXmlResponseBodyConverter.java:36)
                                                                             at retrofit2.converter.simplexml.SimpleXmlResponseBodyConverter.convert(SimpleXmlResponseBodyConverter.java:23)
                                                                             at retrofit2.ServiceMethod.toResponse(ServiceMethod.java:118)
                                                                             at retrofit2.OkHttpCall.parseResponse(OkHttpCall.java:212)
                                                                             at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
                                                                             at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
                                                                             at io.reactivex.Observable.subscribe(Observable.java:10700)
                                                                             at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
                                                                             at io.reactivex.Observable.subscribe(Observable.java:10700)
                                                                             at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
                                                                             at io.reactivex.Scheduler$1.run(Scheduler.java:138)
                                                                             at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                                                             at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)聽
                                                                             at java.util.concurrent.FutureTask.run(FutureTask.java:237)聽
                                                                             at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)聽
                                                                             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)聽
                                                                             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)聽
                                                                             at java.lang.Thread.run(Thread.java:761)聽
03-24 17:04:43.045 28908-29045/com.someapp.xmlrx2test E/AndroidRuntime: Caused by: java.lang.ClassNotFoundException: Didn't find class "rx.Observable" on path: DexPathList[[zip file "/data/app/com.someapp.xmlrx2test-1/base.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_dependencies_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_0_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_1_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_2_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_3_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_4_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_5_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_6_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_7_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_8_apk.apk", zip file "/data/app/com.someapp.xmlrx2test-1/split_lib_slice_9_apk.apk"],nativeLibraryDirectories=[/data/app/com.someapp.xmlrx2test-1/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/base.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_dependencies_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_0_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_1_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_2_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_3_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_4_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_5_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_6_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_7_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_8_apk.apk!/lib/x86_64, /data/app/com.someapp.xmlrx2test-1/split_lib_slice_9_apk.apk!/lib/x86_64, /system/lib64, /vendor/lib64]]
                                                                             at dalvik.system.BaseDexClassLoader.findClass(BaseDexClassLoader.java:56)
                                                                             at java.lang.ClassLoader.loadClass(ClassLoader.java:380)
                                                                             at java.lang.ClassLoader.loadClass(ClassLoader.java:312)
                                                                             at libcore.reflect.InternalNames.getClass(InternalNames.java:53)

@CCorrado create a package in your application called rx, and create an empty class in it called Observable

like this

package rx;

public class Observable {
}

Then it will work

@zhuinden haha yeah that'll work but I mean...eh. wondering what's going on...Thinking maybe during reflection realm is using Rx.Observable somehow

@CCorrado this happens with some reflection-based XML parsers, for example SimpleXML but also Jackson.

I'm not entirely sure why it happens, I just know this is the workaround :smile: it has to do with some sort of "classloading" stuff

Any updates on that problem?
Rx2 + Realm still not works from the box for Realm plugin 3.5.0

@pavel-ismailov RxJava support is being merged into this feature branch currently https://github.com/realm/realm-java/pull/4991

We are done with moving the API's to RxJava2 + add support for changeset observables as well. It will be released with Realm Java 4.0 which currently is scheduled for release in a months time.

@cmelchior

Great

Support has been merged to master in https://github.com/realm/realm-java/pull/4991

Is there a snapshot available for 4.0.0? 4.0.0-BETA2-afe7114-SNAPSHOT (afe7114) does not appear to include RxJava2 support.

SNAPSHOT's should be deployed automatically from our master branch but we are having some CI issues at the moment. Once they are resolved the snapshot should be available.

Here's an article by me about the new RxJava2 support, maybe it helps give some inspiration

http://academy.realm.io/posts/creating-a-reactive-data-layer-with-realm-and-rxjava2/

How to apply this solution- https://stackoverflow.com/questions/38052829/correct-flow-in-rxjava-with-retrofit-and-realm

for RxJava2 asObservable

Facing issues getting latest SNAPSHOTS
https://github.com/realm/realm-java/issues/5267

@cmelchior

Oh man, I'm desperately waiting for Realm to bring out the new version so that we can start using RxJava2!

@LaurieScheepers you can already use RxJava2 using akarnokd/RxJava2Interop

Was this page helpful?
0 / 5 - 0 ratings

Related issues

mithrann picture mithrann  路  3Comments

AlbertVilaCalvo picture AlbertVilaCalvo  路  3Comments

tloshi picture tloshi  路  3Comments

gpulido picture gpulido  路  3Comments

nolanamy picture nolanamy  路  3Comments