Kotlinx.coroutines: wrapping callbacks that fire more than once

Created on 30 Mar 2017  路  8Comments  路  Source: Kotlin/kotlinx.coroutines

I am now trying to convert some Firebase code on Android to use coroutines. some Firebase callbacks are a bit weird in that they sometimes trigger more than once. For example for getting the download url from firebase storage I have written this:

    private suspend fun getFileDownloadUrlAsync(file_sref: StorageReference): Uri = suspendCoroutine { c ->
        with(file_sref.downloadUrl) { //<---- call to Firebase getDownloadUrl()
            addOnCompleteListener { //<--- this callback can trigger more than once!
                if (it.isSuccessful) {
                    c.resume(it.result)
                } else
                    c.resumeWithException(Exception("some error"))
            }
            addOnFailureListener {
                c.resumeWithException(it)
            }
        }
    }

Since the completion listener is triggered more than once I get an IllegalStateException("Already resumed")
As a work-around I have defined

    class WrappedContinuation<T>(val c: Continuation<T>) : Continuation<T> {
        var isResolved = false
        override val context: CoroutineContext
            get() = c.context

        override fun resume(value: T) {
            if (!isResolved) {
                isResolved = true
                c.resume(value)
            }
        }

        override fun resumeWithException(exception: Throwable) {
            if (!isResolved) {
                isResolved = true
                c.resumeWithException(exception)
            }
        }

    }

    public inline suspend fun <T> suspendCoroutineW(crossinline block: (WrappedContinuation<T>) -> Unit): T =
            suspendCoroutine { c ->
                val wd = WrappedContinuation(c)
                block(wd)
            }

and I am using suspendCoroutineW instead of suspendCoroutine. Would it be possible to
modify SafeContinuation class in the library to offer similar functionality, since it already keeps the information about if the continuation has already been resolved or not, and expose it to the user?

Most helpful comment

ok so I find the solution: I should have used suspendCancellableCoroutine

All 8 comments

After first "complete event" the completion listener is unuseful therefore you should remove it.

IMHO this isn't a Continuation issue.

Unfortunately I could not find a way to remove the listener.

ok so I find the solution: I should have used suspendCancellableCoroutine

I'm curios what difference suspendCancellableCoroutine makes for you and what is exactly the story with firebase invoking callback more than once? In what scenarios can this happen?

suspendCancellableCoroutine build a SuspendableCoroutine that exposes (among other stuff) the two methods

public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?

that basically do what I want: they check the continuation state (that I cannot access directly because it is internal data), if it was already resumed.
About Firebase: for that specific API (getFileDownloadUrl) the callback is called twice almost always. There are also other Firebase APIs that behave the same way. I don't know the cause. It is a documented behavior, but I never actually understood why this should happen at all. But this is how it works.

Thanks. Can you point me to the place where Firebase documents multiple callback invocations, please? I want to understand the root cause of it.

for example here

@elizarov, This StackOverflow outlining Firebase realtime snapshot listener using Coroutines is a great example.

I've used the following pattern.

Realtime updates

The extension function awaitRealtime has checks including verifying the state of the continuation in order to see whether it is in isActive state. This is important because the function is called when the user's main feed of content is updated either by a lifecycle event, refreshing the feed manually, or removing content from their feed. Without this check there will be a crash.

_ExtenstionFuction.kt_

suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QuerySnapshot?> { cont ->
    addSnapshotListener({ value, error ->
        if (error == null && cont.isActive && !value!!.isEmpty) {
            cont.resume(value)
        }
    })
}

In order to handle errors the try/catch pattern is used. In a future version the Repository LiveData will be refactored to use the Flow pattern. The ViewModel will observe the Flows and save data to the UI using the existing LiveData states.

_Repository.kt_
```Kotlin
object ContentRepository {
fun getMainFeedList(scope: CoroutineScope, isRealtime: Boolean, timeframe: Timestamp) =
liveData>(scope.coroutineContext) {
val lce = this
lce.emit(Loading())
val labeledSet = HashSet()
val user = getUesrCall(...)
getLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, lce)
getLoggedInNonRealtimeContent(timeframe, labeledSet, lce)
}

// Realtime updates with 'awaitRealtime' used
private suspend fun getLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                      labeledSet: HashSet<String>,
                                      saveCollection: String,
                                      lce: LiveDataScope<Lce<PagedListResult>>) =
        try {
            val list = ArrayList<Content?>()
            user.document(COLLECTIONS_DOCUMENT)
                    .collection(saveCollection)
                    .orderBy(TIMESTAMP, DESCENDING)
                    .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
                    .awaitRealtime()?.documentChanges?.map { doc ->
                doc.document.toObject(Content::class.java).let { content ->
                    list.add(content)
                    labeledSet.add(content.id)
                }
            }
            database.contentDao().insertContentList(list)
        } catch (error: FirebaseFirestoreException) {
            lce.emit(Error(PagedListResult(null,
                    "Error retrieving user save_collection: " +
                            "${error.localizedMessage}")))
        }

// One time updates with 'await' used
private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                  labeledSet: HashSet<String>,
                                                  lce: LiveDataScope<Lce<PagedListResult>>) {
        try {
            val list = ArrayList<Content?>()
            contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                .documentChanges
                ?.map { change -> change.document.toObject(Content::class.java) }
                ?.filter { content -> !labeledSet.contains(content.id) }
                ?.map { content -> list.add(content) }
            database.contentDao().insertContentList(list)
            lce.emit(Lce.Content(PagedListResult(
                queryMainContentList(timeframe), "")))
        } catch (error: FirebaseFirestoreException) {
            lce.emit(Error(PagedListResult(
                null,
                CONTENT_LOGGED_IN_NON_REALTIME_ERROR +
                        "${error.localizedMessage}")))
        }
    }

}

Was this page helpful?
0 / 5 - 0 ratings