Kotlinx.coroutines: SharedFlow didn't cancel or throw exception with callbackFlow

Created on 5 Nov 2020  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

The exception didn't throw to parent scope or cancel itself while using SharedFlow with callbackFlow. I tried to cancel sharedflow in case of failure. Here is my code snippet

fun adapt(call: Call<T>): SharedFlow<T> {
        val scope = CoroutineScope(dispatcher)
        return callbackFlow<T> {
            call.enqueue(object : Callback<T> {
                override fun onResponse(call: Call<T>, response: Response<T>) {
                    sendBlocking(response.body())
                    close()
                }

                override fun onFailure(call: Call<T>, t: Throwable) {
                    close(t)
                }
            })
            awaitClose()
        }.catch {
            /**
             *Here caught exception that threw in Failure case
             * Try to cancel Job so SharedFlow can terminate but It didn't work However catching exception here won't crash my app
             */
            scope.cancel()
        }.shareIn(scope, SharingStarted.Lazily, 1)
    }

I used it

runBlocking{

//this will get first output in case of success and then completed this coroutine
//But In case of Failure, this will always wait to get the first item and never completed

    val result = adapt(someCall).first()
}

Here is another way tried to accomplish the same thing

fun adapt(call: Call<T>): SharedFlow<T> {
        val scope = CoroutineScope(dispatcher)
        return callbackFlow<T> {
            call.enqueue(object : Callback<T> {
                override fun onResponse(call: Call<T>, response: Response<T>) {
                    sendBlocking(response.body())
                    close()
                }

                override fun onFailure(call: Call<T>, t: Throwable) {
                    close(t)
                }
            })
            awaitClose()
        }.shareIn(scope, SharingStarted.Lazily, 1).catch {
            /**
             *This catch block never invoked in case of failure
             * Try to cancel Job so sharedFlow can terminate its coroutine
             */
              currentCoroutineContext().cancel()
        }
    }

I used this as previous but this time

runBlocking{
//this will get first output in case of success and then completed this coroutine
//But In case of Failure App crashes
    val result = adapt(someCall).first()

}

sharedflow.collect|first()|take(1){} stays always in resume state I want to cancel it in case of failure.

flow-sharing waiting for clarification

Most helpful comment

catch on SharedFlow is meaningless, since SharedFlow never completes. I'll add the corresponding Lint checks with warnings to explain it.

All 5 comments

Sorry, but I cannot figure out what you are trying to achieve. Do you run into the problem with catch _before_ shareIn or with catch _after_ shareIn? Can you, please, provide a self-contained reproducer code that demonstrates what you try to do and explain what you have expected this code to do?

@elizarov I added more details could you please check it now?

SharedFlow never completes (see more in the docs).

To achive what you need you can do something like this:

val flow = callbackFlow { ... }
    .catch { e -> 
        // catch before shareIn operator (!!!)
        emit(ErrorValue) // emit some special error value to indicate that error happended
    }
   .shareIn(....) 
   .takeWhile { it != ErrorValue } // take the flow until there's no error

Because you use takeWhile operator the resulting flow will complete.

Does it help?

P.S. In the future we plan to provide some ready-to-use operators for that. See #2092

Yes this is a nice trick but I wonder why

val flow = callbackFlow { ... }
   .shareIn(....) 
   .catch(...)//this won't work

catch after shareIn didn't catch anything and app crashes.
And

val flow = callbackFlow { ... }
    .catch { e -> 
      scope.cancel()
    }
   .shareIn(scope,.,.) 

Didn't work.

catch on SharedFlow is meaningless, since SharedFlow never completes. I'll add the corresponding Lint checks with warnings to explain it.

Was this page helpful?
0 / 5 - 0 ratings