I run into problem when creating Observable with:
Observable.create(new ObservableOnSubscribe())
When new ObservableOnSubscribe implements Disposable it is expected to call dispose() method to release some resources, that is not happening.
Observable created with Observable.create(..) doesn't implement Disposable and the upstream clean up isn't passing through to ObservableOnSubscribe.
I am open to discussion and suggest small change with:
Observable.java add method:
public static <T> Observable<T> createDisposable(DisposableObservableOnSubscribe<T> observableOnSubscribe) {
return Observable.create(observableOnSubscribe).doOnDispose(observableOnSubscribe::dispose);
}
with adding new Interface of DisposableObservableOnSubscribe:
public interface DisposableObservableOnSubscribe<T> extends ObservableOnSubscribe<T>, Disposable {
}
Or doing it other way - like I did on: gist
I didn't found something that would solve this use case.
Did you call emitter.setDisposable(this)?
Here's a simpler implementation:
public final class RxBroadcastReceiver implements ObservableOnSubscribe<Intent> {
public static Observable<Intent> create(Context context, IntentFilter intentFilter) {
return Observable.create(new RxBroadcastReceiver(context, intentFilter));
}
private final Context context;
private final IntentFilter intentFilter;
private RxBroadcastReceiver(Context context, IntentFilter intentFilter) {
this.context = context;
this.intentFilter = intentFilter;
}
@Override
public void subscribe(ObservableEmitter<Intent> emitter) throws Exception {
final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
emitter.onNext(intent);
}
}
context.registerReceiver(broadcastReceiver, intentFilter);
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
context.unregisterReceiver(broadcastReceiver);
}
}));
}
}
Thanks, I think that is something I can use - not tested yet.
Also I realized that multiply subscriptions to same RxBroadcastReceiver will not work correctly (on my example), which is fixed in yours simplified version with having emitteras local parameter only - only last one will get emits. It is not case where I use it right now but I plan to update this in future.
I plan to extend this to not create and register BroadcastReceiver on every subscription only on first one and unregister on last one.
Don't bother with that. Just call share() on the returned instance. It will reference count the underlying subscription so that it only remains when more than 1 are subscribed.
It worked like a charm, thank you Jake.
Most helpful comment
Here's a simpler implementation: