[ rxjava 2.1.0 ]
Hi I just finish watching video about managing state by Jake Wharton. I got a problem though,
fun loadFirstPage(): ObservableTransformer<LoadFirstPageAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.flatMap { _
-> mainModel.getNews("", "10") // <-- whats wrong with you! took
// 1 day of my life
.map { data -> MainResult.success(data) }
.onErrorReturn { error -> MainResult.failure(error.message ?: "Unknown error") }
.observeOn(AndroidSchedulers.mainThread())
.startWith { MainResult.loading()} }
}
}
fun loadNextPage(): ObservableTransformer<LoadNextPageAction, MainResult> {
...
}
fun actionToResultTransformer(): ObservableTransformer<MainAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.publish { shared -> Observable.merge(
shared.ofType(LoadFirstPageAction::class.java).compose(loadFirstPage()),
shared.ofType(LoadNextPageAction::class.java).compose(loadNextPage())) }
}
}
When firing a LoadFirstPageAction, loadFirstPage() gets executed but the observable is not starting.
mainModel.getNews("", "10") is a retrofit api.
I know this post is a nono on the guidelines, but I cant find the same issue in the internet.
Does mainModel.getNews("", "10") work on its own? If so, you could add doOnNext(System.out::println) to various places to see where the data stops propagating.
On what do you apply the actionToResultTransformer? Do you subscribe to the result of this transformer?
mainModel.getNews("", "10") I am 100% sure this works.
I put a log in mainModel.getNews("", "10") and shows the log, but stops there.
actionToResultTransformer I am composing it and subscribing to it.
Maybe you get a silent NetworkOnMainThread? You can use doOnEach to print out the other types of signals.
But Im putting a logging interceptor. It is not hitting the server, not even trying to request.
fun getNews(after: String, limit: String): Observable<RedditNewsResponse> {
Log.d("mainModel", "get news called") //<--- this gets called
return redditApi.getTop(after, limit) //<--- but data stops propagating here
}
Add the following:
return redditApi.getTop(after, limit)
.doOnSubscribe(s -> Log.d("mainModel", "get news subscribed"))
.doOnEach(e -> Log.d("mainModel", "getTop: " + e))
Also please make sure you are using the most recent Retrofit.
Everything works except this one
fun loadFirstPage(): ObservableTransformer<LoadFirstPageAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.flatMap { _ -> mainModel.getNews("", "10") // <-- whats wrong with you! took 1 day of my life
.map { data -> MainResult.success(data) }
.onErrorReturn { error -> MainResult.failure(error.message ?: "Unknown error") }
.observeOn(AndroidSchedulers.mainThread())
.startWith { MainResult.loading()} }
}
}
Okay Ill do what you said.
Hi
doOnSubscribe{s -> Log.d("mainModel", "get news subscribed")}
is not called. No error also.
I think this can help. My whole presenter.
class MainPresenter(private var mainModel: MainModel,
private var mainView: MainView) {
private val TAG: String = "MainPresenter"
private val intentSubject: PublishSubject<MainIntent> = PublishSubject.create()
private val stateSubject: BehaviorSubject<MainState>
= BehaviorSubject.createDefault(MainState.initial())
private val disposables: CompositeDisposable = CompositeDisposable()
fun loadFirstPage(): ObservableTransformer<LoadFirstPageAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.flatMap { _ -> mainModel.getNews("", "10") // <-- whats wrong with you! took 1 day of my life
.map { data -> MainResult.success(data) }
.onErrorReturn { error -> MainResult.failure(error.message ?: "Unknown error") }
.observeOn(AndroidSchedulers.mainThread())
.startWith { MainResult.loading()} }
}
}
fun loadNextPage(): ObservableTransformer<LoadNextPageAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.flatMap {(after) -> mainModel.getNews(after, "10")
.map { data -> MainResult.success(data) }
.onErrorReturn { error -> MainResult.failure(error.message ?: "Unknown error") }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.startWith { MainResult.loading()} }
}
}
fun actionToResultTransformer(): ObservableTransformer<MainAction, MainResult> {
return ObservableTransformer {
actionStream -> actionStream.publish { shared -> Observable.merge(
shared.ofType(LoadFirstPageAction::class.java).compose(loadFirstPage()),
shared.ofType(LoadNextPageAction::class.java).compose(loadNextPage())) }
}
}
init {
intentSubject.map { intent -> intent.toAction()}
.compose(actionToResultTransformer())
.scan(MainState.initial(), { prevState, result ->
when(result.state) {
MainResult.State.LOADING -> MainState.loading(prevState)
MainResult.State.SUCCESS -> MainState.success(prevState, result)
MainResult.State.FAILURE -> MainState.failure(prevState, result.error)
}
}).subscribe{ state -> stateSubject.onNext(state) }
}
fun reattach(mainModel: MainModel, mainView: MainView) {
this.mainModel = mainModel
this.mainView = mainView
Log.d(TAG, "reattaching model and view")
}
fun onCreate() {
disposables.add(observeState())
disposables.add(onObserveRecyclerViewItemClick())
disposables.add(bindViewIntentSource())
}
fun onDestroy() {
disposables.clear()
}
fun bindViewIntentSource(): Disposable {
return mainView.intents().subscribe {
intent -> run {
Log.d("intent", "inside presenter")
intentSubject.onNext(intent)
}
}
}
fun observeState(): Disposable {
return stateSubject.subscribe {
state -> mainView.render(state)
}
}
fun onObserveRecyclerViewItemClick(): Disposable {
return mainView.observeFeedClicked()
.subscribe { data -> mainModel.startBrowserActivity(data.data) }
}
}
Okay, please add .subscribeOn(Schedulers.io()) just after the redditAPI.getTop() call.
Still doOnSubscribe{s -> Log.d("mainModel", "get news subscribed")} is not called.
Do I need to subscribe to loadFirstPage() transformer somewhere even if its part of actionToResultTransformer()?
No, the composition takes care of that.
Next thing to try: please add doOnDispose(s -> Log.d("mainModel", "disposed")) after doOnSubscribe to see if something gets you disposed immediately. Also please add error handlers to your subscribe calls.
doOnDispose(s -> Log.d("mainModel", "disposed")) is also not called.
added error handler in my subscribe.
Can you post the entire code so I can run it and see for myself?
Okay Ill push my code
Not in this issue of course but as a Github project?
Hi, I updated my repo now. https://github.com/novodimaporo/LearningRxJavaKotlin
I know what's wrong:
.startWith ( MainResult.loading() )
By using curly braces with startWith, you actually created a no-op Observable instead of specifying a constant initial value. With the changes above, the app works for me.
Oh c'mon! How treacherous kotlin can be. I could cry a lot. Thank you for your help Mr David, I appreciate it a lot. Ill be careful with braces now. semi-colon gone braces comes.
Great! I'll now look out for such misleading uses of curly braces too.
By the way Mr David, why was it written that way?
Whats the purpose of .startWith { MainResult.loading() } then?
That's an artifact of Kotlin that tricks you to write a lambda for a functional interface type. In Java, that overload is there for starting with another Observable:
Observable.range(1, 5).startWith(Observable.just(1)).subscribe(System.out::println);
I get it now, its a cool feature.
Hello, I am facing the same kind of problem. Not able to use sealed class with ObservableTransFormer
var actionProcessor: ObservableTransformer<MainActions, MainResult> =
ObservableTransformer { actions: Observable<MainActions> ->
actions.publish { shared ->
shared.ofType(MainActions.LoadSongsAction::class.java).compose(loadSongsProcessor)
}
}
private val loadSongsProcessor: ObservableTransformer<MainActions, MainResult> =
ObservableTransformer { actions: Observable<MainActions.LoadSongsAction> ->
actions.switchMap {
service.fetchSongs()
.subscribeOn(Schedulers.io())
.map { songs -> MainResult.LoadTaskResult.Success(songs) }
.onErrorReturn { error -> MainResult.LoadTaskResult.Failure(error.message ?: "Unknown error") } //<--compilation error should return MainResult.LoadTaskResult.Success only
.observeOn(AndroidSchedulers.mainThread())
.startWith(MainResult.LoadTaskResult.InFlight) //<--compilation error
}
}
Anyone can please help me out with this.
Most helpful comment
I know what's wrong:
By using curly braces with
startWith, you actually created a no-opObservableinstead of specifying a constant initial value. With the changes above, the app works for me.