I'd like to create an observable and dynamically push data to it. I create the following code to do it:
Observable
.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
eventSubscriber = subscriber;
}
})
.flatMap(new Func1<Object, Observable<Object>>() {
@Override
public Observable<Object> call(Object Object) {
// perform some transformation.
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
// perform some action.
}
});
.....
// Other place that generates data
eventSubscriber.onNext(data);
However, the problem is that whenever there's new data pushed to the observable, transformed by the flatMap, and consumed by the subscription, the eventSubscriber will be updated with a new subscriber. Is there a way to avoid this?
Yes, use PublishSubject.
PublishSubject<Integer> ps = PublishSubject.create();
ps.flatMap(v -> transform(v)).subscribe(...);
ps.onNext(data);
Thanks @akarnokd. I tried PublishSubject but it has the effect of creating duplicated events. For example, here's a simplified version of my code:
PublishSubject<Integer> ps = PublishSubject.create();
Observable<Integer> o1;
Observable<Integer> o2;
o1 = ps.flatMap(v -> return o2);
o2 = ps.flatMap(v -> return o1);
o1.subscribe(v -> print(v));
# Generate some data
ps.onNext(1);
ps.onNext(2);
ps.onNext(1);
ps.onNext(2);
# Results:
1
2
1
1
2
2
2
The reason I need to write code like this is for simulating a finite state machine. o1 and o2 are like states and they goes to each other when seeing some events.
I'm not entirely sure what your goal is but if I was going to make a finite state machine I would base it on the scan or reduce operator.
Observable<Input> src;
Observable<State> lastState = src.reduce(INIT_STATE, this::lookupNextStateForCurrentStateAndInput)
if (lastState.toBlocking().single().isTermainal()) {
System.out.println("woo it matched");
} else {
System.out.println("failed to match input");
}
Thanks @abersnaze. I've considered using scan/reduce operators, but flatMap gives me advantage over scan/reduce in terms of code structure. I'll try to illustrate it using the following codes.
Using flatMap:
Observable<Input> events;
Observable<State> startState;
Observable<State> finalState;
startState =
events.flatMap(e -> {
Observable<State> nextState = startState;
if (event is A) {
nextState = finalState;
}
return nextState.startWith(e);
});
finalState =
events.flatMap(e -> {
Observable<State> nextState = finalState;
if (event is C) {
nextState = startState;
}
return nextState.startWith(e);
});
startState.subscribe(e -> {
if (event is A) {
// do something with A
} else if (event is B) {
// do something with B
} else if (event is C) {
// do something with C
}
});
Using reduce:
Observable<Input> events;
Observable<State> states =
events.reduce(START_STATE, (curState, event) -> {
State nextState = curState;
if (curState == START_STATE) {
if (event is A) {
nextState = FINAL_STATE;
// do something with A
} else if (event is B) {
// do something with B
}
} else if (curState == FINAL_STATE) {
if (event is C) {
nextState = START_STATE;
// do something with C
}
}
});
The difference is that, with the flatMap approach, I could separate state transition and event handling. The piece of codes looks modular and flat. While with the reduce approach, things are mixed up. Especially in my case where there are a lot of states and events. The codes are harder to read.
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Most helpful comment
Yes, use
PublishSubject.