The exception didn't throw to parent scope or cancel itself while using SharedFlow with callbackFlow. I tried to cancel sharedflow in case of failure. Here is my code snippet
fun adapt(call: Call<T>): SharedFlow<T> {
val scope = CoroutineScope(dispatcher)
return callbackFlow<T> {
call.enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
sendBlocking(response.body())
close()
}
override fun onFailure(call: Call<T>, t: Throwable) {
close(t)
}
})
awaitClose()
}.catch {
/**
*Here caught exception that threw in Failure case
* Try to cancel Job so SharedFlow can terminate but It didn't work However catching exception here won't crash my app
*/
scope.cancel()
}.shareIn(scope, SharingStarted.Lazily, 1)
}
I used it
runBlocking{
//this will get first output in case of success and then completed this coroutine
//But In case of Failure, this will always wait to get the first item and never completed
val result = adapt(someCall).first()
}
Here is another way tried to accomplish the same thing
fun adapt(call: Call<T>): SharedFlow<T> {
val scope = CoroutineScope(dispatcher)
return callbackFlow<T> {
call.enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
sendBlocking(response.body())
close()
}
override fun onFailure(call: Call<T>, t: Throwable) {
close(t)
}
})
awaitClose()
}.shareIn(scope, SharingStarted.Lazily, 1).catch {
/**
*This catch block never invoked in case of failure
* Try to cancel Job so sharedFlow can terminate its coroutine
*/
currentCoroutineContext().cancel()
}
}
I used this as previous but this time
runBlocking{
//this will get first output in case of success and then completed this coroutine
//But In case of Failure App crashes
val result = adapt(someCall).first()
}
sharedflow.collect|first()|take(1){} stays always in resume state I want to cancel it in case of failure.
Sorry, but I cannot figure out what you are trying to achieve. Do you run into the problem with catch _before_ shareIn or with catch _after_ shareIn? Can you, please, provide a self-contained reproducer code that demonstrates what you try to do and explain what you have expected this code to do?
@elizarov I added more details could you please check it now?
SharedFlow never completes (see more in the docs).
To achive what you need you can do something like this:
val flow = callbackFlow { ... }
.catch { e ->
// catch before shareIn operator (!!!)
emit(ErrorValue) // emit some special error value to indicate that error happended
}
.shareIn(....)
.takeWhile { it != ErrorValue } // take the flow until there's no error
Because you use takeWhile operator the resulting flow will complete.
Does it help?
P.S. In the future we plan to provide some ready-to-use operators for that. See #2092
Yes this is a nice trick but I wonder why
val flow = callbackFlow { ... }
.shareIn(....)
.catch(...)//this won't work
catch after shareIn didn't catch anything and app crashes.
And
val flow = callbackFlow { ... }
.catch { e ->
scope.cancel()
}
.shareIn(scope,.,.)
Didn't work.
catch on SharedFlow is meaningless, since SharedFlow never completes. I'll add the corresponding Lint checks with warnings to explain it.
Most helpful comment
catchonSharedFlowis meaningless, sinceSharedFlownever completes. I'll add the corresponding Lint checks with warnings to explain it.