Hi, I have a database observable which emits android Cursor. I am mapping this cursor to a POJO. Since Cursor is a native resource it needs to be closed. Therefore i am using using
val queryObservable: Observable<Cursor>
val contactObservable: Observable<Optional<Contact> = queryObservable
.flatMap { cursor ->
Observable.using(
{ cursor },
{ Observable.fromCallable { optionalOf(mapContact(it)) } },
{ it.close() })
}
As far as I know, this could break if stream is disposed exactly after queryObservable emits (producing the cursor) and before the flatmap, hence leaking the Cursor, right?
_Is it possible to even have a Observable< Cursor > then?_
Only way I can see this being bulletproof is if the cursor is being created inside the first lambda in using, right? But what If it need to be Observable, i.e. many cursors over time (when database changed while that observable is subscribed)?
All examples of using I have seen were about Socket, where exactly as I said, Socket instance was created in that lambda
This is generally a resource awareness problem none of the current reactive libraries try to solve.
The usual approach is to have the cursor created synchronously/blockingly in the supplier so that it can be released by using.
Otherwise, the more stages and potential asynchrony is between the creation of the cursor and the consumption, you are risking leaks and drops.
You may try having a CompositeDisposable and keep adding/removing disposable cursor records:
public final class CursorDisposable implements Disposable {
public final Cursor cursor;
public CursorDisposable(Cursor cursor) {
this.cursor = cursor;
}
@Override
public void dispose() {
cursor.close(); // try-catch if you must
}
}
CompositeDisposable cursors = new CompositeDisposable();
queryObservable
.map(cursor -> {
CursorDisposable cd = new CursorDisposable(cursor);
cursors.add(cd);
return cd;
})
.flatMap ( cursor ->
Observable.fromCallable { optionalOf(mapContact(cursor.cursor))
.doFinally(() -> cursors.remove(cursor))
)
.doFinally(() -> cursors.dispose());
I've also replied in Kotlinlang Slack (#rx) https://kotlinlang.slack.com/archives/C0B8Y8BHC/p1541613356023800, going to cross-post here for people having same question
@artem_zin:
Easiest way to do that would be to avoid
Cursorbeing emitted
contentObservable(resolver, uri, handler)
.flatMapSingle {
resolver
.query(uri, projection, selection, selectionArgs, sortOrder)
.use {
it.let { mapContact(it) }
}
}
However, realistically, chances of situation when Cursor is emitted but stream disposed before Cursor was consumed is small and
AbstractCursor(which most Cursors extend) closes itself in itsfinalizemethod, so while not perfect, it won’t leak
AbstractCursor is part of Android SDK
yes thats me, thanks for your help
For SQL Brite and SQL Delight we emit a Query object which is a Cursor
factory. This makes thread changes, caching (whether explicit or implicit
like combineLatest), and use as a resource all trivially work. Generally
the Cursor is only exposed inside a lambda inside map or flatMap.
On Thu, Nov 8, 2018, 11:22 AM ursusursus <[email protected] wrote:
yes thats me, thanks for your help
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/6293#issuecomment-437057571,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEdYT_7sa6TdiJjuwZeTeKkV9KaTWks5utFpfgaJpZM4YT7wv
.