Kotlinx.coroutines: Allow callbackFlow to specify capacity

Created on 3 Sep 2019  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

I just wrote the following flow to observe changes in a preference.

val sharedPrefs: SharedPreferences
val key: String
val defaultValue: Boolean

fun observe(): Flow<Boolean> = callbackFlow {
    @Suppress("ObjectLiteralToLambda")
    val listener = object : SharedPreferences.OnSharedPreferenceChangeListener {
        override fun onSharedPreferenceChanged(prefs: SharedPreferences, key: String) {
            if ([email protected] == key) {
                // I want to guarantee that this `.offer(..)` call emits
                offer(prefs.getBoolean(key, defaultValue))
            }
        }
    }
    send(sharedPrefs.getBoolean(key, defaultValue))
    sharedPrefs.registerOnSharedPreferenceChangeListener(listener)
    awaitClose {
        sharedPrefs.unregisterOnSharedPreferenceChangeListener(listener)
        listeners.remove(listener)
    }
}

I would like to specify a channel capacity of Channel.UNLIMITED to guarantee that .offer(..) will succeed, but the current default is Channel.BUFFERED, without the option to specify the limit.

Another option would be to use Channel.UNLIMITED as the default as it's "safer"

question

Most helpful comment

In this specific case, you probably want conflated. If someone misses a value change of a preference, they likely just want to get the latest value, not all of the value changes that occurred since the last one.

But in general, unlimited buffers are how you get cascading failures rather than localized ones (or potentially none at all). Unlimited queues are very hard to recover from when the producer is faster than the consumer because there's neither backpressure being applied nor points where you can shed load (because how would you even know?).

Especially in the design of Flow, backpressure is handled naturally and arguably transparently inside the system such that producers can't outrun consumers. At the points where you bridge in and out of the suspension world, there's still signals like the return value of offer or runBlocking and actual caller-blocking which you can use to try and slow (or stop) your producer. Too-small or too-large buffers are things you can tweak over time based on actual usage patterns, but every unlimited buffer is just a time-bomb for a crash and/or outage.

All 5 comments

Actually, upon further look - what's the difference between callbackFlow and channelFlow - they seem to be the same, just that callbackFlow is channelFlow with default params

Call .buffer(UNLIMITED), per the callbackFlow docs:

A channel with default buffer size is used. Use buffer operator on the resulting flow to specify a value other than default and to control what happens when data is produced faster than it is consumed, that is to control backpressure behavior.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html

Huuuuge -1 to unlimited being a good default though.

I still don't understand the difference between callbackFlow vs channelFlow - the only difference seems to be the noinline keyword. Why would I use one vs the other?

Huuuuge -1 to unlimited being a good default though.

What makes BUFFERED a better default? I see RENDEZVOUS being on one extreme and UNLIMITED being on the other - but why have BUFFERED as a middle ground?

In this specific case, you probably want conflated. If someone misses a value change of a preference, they likely just want to get the latest value, not all of the value changes that occurred since the last one.

But in general, unlimited buffers are how you get cascading failures rather than localized ones (or potentially none at all). Unlimited queues are very hard to recover from when the producer is faster than the consumer because there's neither backpressure being applied nor points where you can shed load (because how would you even know?).

Especially in the design of Flow, backpressure is handled naturally and arguably transparently inside the system such that producers can't outrun consumers. At the points where you bridge in and out of the suspension world, there's still signals like the return value of offer or runBlocking and actual caller-blocking which you can use to try and slow (or stop) your producer. Too-small or too-large buffers are things you can tweak over time based on actual usage patterns, but every unlimited buffer is just a time-bomb for a crash and/or outage.

@ZakTaccardi There is no different between callbackFlow and channelFlow. They are the same. They just have different names to tailor their docs the the speicific use-case and to enable the code using them to convey your intention. E.g., when I read your code I would immediately see what you are planning to do just by the name of the flow builder you are using.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

elizarov picture elizarov  路  3Comments

mariusstaicu picture mariusstaicu  路  3Comments

jaozinfs picture jaozinfs  路  3Comments

Leftwitch picture Leftwitch  路  3Comments

IgorKey picture IgorKey  路  3Comments