Kotlinx.coroutines: Ignoring an exception from a single run with Flow

Created on 25 Sep 2020  路  15Comments  路  Source: Kotlin/kotlinx.coroutines

Ignoring an exception from a single run with Flow

I'm currently in a project where the local Room database exposes Flow to observe database changes.
Say I'm building a chain of operators before collecting the results from the database to shape my data for the user interface.
If any of the statements throw an exception before collecting, is there any way to skip/ignore that single run and continue the flow?

I understand that using the catch() operator allows me to have a fallback (using emit or emitAll) for the entire Flow, not a single run within the Flow. If I use emit, I will collect the value I emitted and the flow completes because it is a single value, definitely not ideal for Room as I would lose notifications. Using emitAll is not suitable for this case as I would need to repeat my entire chain, it would be preferable to use retryWhen. In my case, using the retryWhen operator might be enough, but not ideal. Whenever an exception comes up, a new query is made to the database and the Flow always emits the initial state of the database.

Ideally, I could start the Flow once and if an exception was thrown during the run of a specific value I could simply log it or have a dedicated state for it and keep listening for future emissions. Retry should be optional.

In a shorter example

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun main() {
    runBlocking {
        (0..5).asFlow()
            .onEach { check(it != 3) }
            .collect { println(it) }
    }
}

In this case, how could I ignore the run that throws the exception and move on until number 5? Try catching inside a Flow feels wrong 馃槩

A less predictable example

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun main() {
    runBlocking {
        userRepo.fetchCachedUserIds()
            .map {
                 fetchUserDetails(it) // An exception-prone network call
             }
            .collect { println(it) }
    }
}

In this case, we subscribe to a list of users that is stored locally and wish to enrich the user interface by fetching some details remotely. How can I start this flow only once and handle a possible exception from the network without stop listening for future values?

flow question

All 15 comments

Try catching inside a Flow feels wrong

I don't really agree with this statement, however you can always define another function like fetchUserDetailsOrNull

@fvasco I agree that it is a possible solution, just like try catching. My goal here was to have a centralised place to catch any exception thrown upstream from a single run.
I also recognise that propagating nullable values downstream is not always ideal as I may want to decompose a bigger task into several pieces and then each piece of the chain would have to accommodate for a nullable type because there might have been a previous exception upstream.
Ideally, I should be able to model my Flow chain based on a happy path and treat exceptions to that happy path in a single place, like as if it was a try/catch around the whole run instead of smaller try/catches around every statement.

Hi, @6uilhermeAlmeida,
can you provide an example with this new operator to understand how this should fix this issue better than a try-catch+transform, please?

Yes, I can try. When you say a combination of try-catch + transform do you mean something like this?

    inline fun <T, R> Flow<T>.tryCatch(
        crossinline tryBlock: suspend (value: T) -> R,
        crossinline onFailure: suspend FlowCollector<R>.(throwable: Throwable) -> Unit
    ): Flow<R> = transform { value: T ->
        try {
            val result = tryBlock(value)
            emit(result)
        } catch (t: Throwable) {
            onFailure(t)
        }
    }

This would still need some work but I'll follow along assuming it is something like this, by any means, correct me if I am wrong.
This operator could in fact be useful for some cases, but it is not what I am looking for as this catches only the exceptions inside that one operator.

Imagine the following:

fun main() {
    runBlocking {
        userRepo.fetchCachedUserIds()
            .tryCatch (
                 tryBlock = { fetchUserDetails(it) }, // An exception-prone network call
                 onFailure = { t -> onFetchUsersDetailsError(t) }
             )
            .collect { onSuccess(it) }
    }
}

This would probably be fine as it is a single statement and we are actually in a context from where we can gracefully handle the exception by using onFetchUsersDetailsError.

What if we are not in a context to handle the error directly? Say we have this task encapsulated in a use case:

class FetchUsersDetails (userRepo : UserRepository) {
    operator fun invoke() : Flow<List<UserDetail>> = userRepo.fetchCachedUserIds()
        .map { fetchUserDetails(it) }
}

What is the best approach here? Should I replace map with the tryCatch operator and pass a null value/wrapper object downstream if I have an exception when fetching the user details? What if there are other use cases that depend on this one? They will all have to accommodate for a possible null list/wrapper object even though that state only exists because of a possible exception? I believe we can recognise that a null value is not really a good approach as it does not provide enough information on what happened, for this we could come up with a wrapper class similar to Result for instance.
I am currently not a big fan of _exposing_ these wrapper classes, I prefer to either expose the value itself or an exception. I believe this is currently quite present in the coroutine world and I think it bring a lot more clarity to the code.

So, I imagine a better approach would be to catch any exception thrown upstream by any of the operators just like catch does. But instead of replacing the whole flow, it would just catch the exception for that specific run and keep listening for future emissions (if the exception was not re-thrown). Below you can see a sketch of what it could look like (sorry for the awful name):

fun main() {
    runBlocking {
        fetchUserDetails()
            .catchRun { t -> onFetchUsersDetailsError(t) }
            .collect { onSuccess(it) }
    }
}

It would act and look very similar to the known catch operator, you should be able provide a fallback value for that run through emit (not sure if emitAll would make sense here), simply catch it or even re-throw it.

I'm not sure if this is a valid use case but it's my two cents as I recently had a hard time figuring out the best way to never let a Flow from Android Room complete because of an exception in the chain. The solution I found was the retryWhen operator but, as previously mentioned, is not ideal.

do you mean something like this?

Not exactly, but I am interested in this issue.

What is the proposed signature of catchRun?

@fvasco, I am sorry, I'm not familiar with these terms. What do you mean by proposed signature?

@fvasco If you are asking to propose the prototype for this operator I think it could be something like :
inline fun <T> Flow<T>.catchOnEach(crossinline onError: suspend FlowCollector<T>.(throwable: Throwable) -> Unit): Flow<T>

I am assuming we could emit a fallback value from this operator and that's why I included the FlowCollector, but I don't think emitAll makes sense here. So maybe a new segregation would be needed.

Thank you, @6uilhermeAlmeida,
are you assuming that the Flow this@catchOnEach can throw an Exception for each element?

Can you fix the examples in your first post using this new operator?

In your second example, due an network error, an "UserDetails" may be lost, are you evaluated that this operator should recover this error without any information about the cause (the userDetail instance)?
In other words, why this operator should work better then a simple try+catch?

Yes, according to what I was thinking this operator should be able to re-throw exceptions per run in case the developer feels like the exception should end the entire flow or if it wishes to catch this exception in a further catchOnEach/catch that sits downstream. That part would basically work like it does for the catch operator.

Fixing the examples above would look something like :

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun main() {
    runBlocking {
        (0..5).asFlow()
            .onEach { check(it != 3) }
            .catchOnEach { t ->
                 logError(t)
                 emit(-1) // optionally emit a fallback value if needed
             }
            .collect { println(it) }
    }
}

In this case, we should be able to reach number 5 which we cannot do with catch.
This should print the following numbers from the collect println : 0, 1, 2, -1, 4, 5
In case we did not have the emitcall there for a fallback value, it should print : 0, 1, 2, 4, 5 (basically skipping the run with the exception)

For the second example :

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun main() {
    runBlocking {
        userRepo.fetchCachedUserIds()
            .map {
                 fetchUserDetails(it) // An exception-prone network call
             }
            .catchOnEach { t ->logError(t) } // just catching the exception and logging it, no fallback value here
            .collect { println(it) }
    }
}

Here, assuming fetchCachedUserIds is being periodically pushed with new information when the db values change, in case there is an exception for one of the runs it will simply be logged and the flow does not collect for that value, but is still listening for values that may come later.

Both examples may look silly in this context because we could transform + try-catch where the exception might occur and handle the exception directly on the spot. The problem is when we want to catch an exception that may come from encapsulated behavior which we cannot predict.

Say I want to consume this use case FetchUsersDetails which depends on a second use case GetCachedUserIds:

class FetchUsersDetails (getCachedUserIds : GetCachedUserIds) {
    operator fun invoke() : Flow<List<UserDetail>> = getCachedUserIds()
        .map { fetchUsersDetails(it) }
}

Both FetchUsersDetails and GetCachedUserIds use cases are based on a happy path, their flows expose List\

Using catch:

fun main() {
    runBlocking {
        fetchUserDetails()
            .catch { t -> onFetchUsersDetailsError(t) }
            .collect { onSuccess(it) }
    }
}

With catch, this flow will complete and I'll stop receiving notifications from it (definitely not ideal if you are reacting to changes in a database for instance). If I emit a fallback value, I will collect that value, but then it will complete as well because it is a single value.

I would like to stop the flow from completing because of an exception and be able to either skip the run that threw the exception or provide a fallback value for that run.

Using catchOnEach:

fun main() {
    runBlocking {
        fetchUserDetails()
            .catchOnEach { t -> onFetchUsersDetailsError(t) }
            .collect { onSuccess(it) }
    }
}

In this case, if one of the runs for the fetchUserDetails() Flow threw an exception, we could handle it as we wish and keep listening for future emissions that might possibly be successful.

If for any reason the happy path for a run is not met, we can catch the exceptions for that run in catchOnEach but this time we won't be replacing the entire flow to handle the exception. We can instead catch the exception, optionally emita fallback value for the flawed run, and move on with collecting the next values in the Flow.

Hi @6uilhermeAlmeida,
I suspect that choices like this can clash with the Flow specification (here)

Flow is a sequence of zero or more items (item*), instead you are assuming a Flow can emit (item|exception)*.

Please consider your second example:

userRepo.fetchCachedUserIds()
  .map { fetchUserDetails(it) }
  .catchOnEach { t ->logError(t) }
  .collect { println(it) }

Today I want to implement fetchCachedUserIds as follow

fun fetchCachedUserIds() =
  myCache.getUserIds()
    .onStart { checkPermissionOrThrowAuthorizationException() }

What is the behaviour of second example?

In this case, if one of the runs for the fetchUserDetails() Flow threw an exception, we could handle it as we wish and keep listening for future emissions that might possibly be successful.

I still prefer the plain ugly try+catch option.
If you don't like the try+catch block, then you can consider to avoid throw, too: both doesn't fit well with functional programming.

@fvasco I am not sure I understood your point. I am not assuming a Flow can emit (item|exception)*, I assume a you can emit items and throw exceptions.

The behaviour of the example with your implementation of the use case would be printing that AuthorizationException in case the check failed. I believe that is how catch would behave as well. The only difference is that with catch the Flow would complete and with catchOnEach it would not, unless you re-throw that exception and there is no other catch/catchOnEach downstream to catch it. I believe the operator would be very similar to catch, that's why I am not getting how it disrupts the spec.

@fvasco Should we close this issue? I see there is no further interest in pursuing this operator 馃槩

Hi, @6uilhermeAlmeida,
my previous considerations remain unchanged, I don't have nothing to add.
I still suspect that the current proposal, as I understand it, violates the Flow contract, throwing an Exception terminates the Flow and all subsequent operators must not be invoked (including catchOnEach).

If this answer is enough for you, you can close this issue.
Otherwise, you can consider to examine deeper your proposal, to provide a POC and to explain how this proposal solves a concrete and widespread use case.

@fvasco Thank you for your time 馃憤
Just to be clear, currently, the only way to keep a flow alive is to materialise possible exceptions?

Hi @6uilhermeAlmeida,
I already send you the Flow documentation link and I suggest you to read it carefully.
The answer of your last question is in the first line of the description.

Was this page helpful?
0 / 5 - 0 ratings