I just wrote the following flow to observe changes in a preference.
val sharedPrefs: SharedPreferences
val key: String
val defaultValue: Boolean
fun observe(): Flow<Boolean> = callbackFlow {
@Suppress("ObjectLiteralToLambda")
val listener = object : SharedPreferences.OnSharedPreferenceChangeListener {
override fun onSharedPreferenceChanged(prefs: SharedPreferences, key: String) {
if ([email protected] == key) {
// I want to guarantee that this `.offer(..)` call emits
offer(prefs.getBoolean(key, defaultValue))
}
}
}
send(sharedPrefs.getBoolean(key, defaultValue))
sharedPrefs.registerOnSharedPreferenceChangeListener(listener)
awaitClose {
sharedPrefs.unregisterOnSharedPreferenceChangeListener(listener)
listeners.remove(listener)
}
}
I would like to specify a channel capacity of Channel.UNLIMITED to guarantee that .offer(..) will succeed, but the current default is Channel.BUFFERED, without the option to specify the limit.
Another option would be to use Channel.UNLIMITED as the default as it's "safer"
Actually, upon further look - what's the difference between callbackFlow and channelFlow - they seem to be the same, just that callbackFlow is channelFlow with default params
Call .buffer(UNLIMITED), per the callbackFlow docs:
A channel with default buffer size is used. Use buffer operator on the resulting flow to specify a value other than default and to control what happens when data is produced faster than it is consumed, that is to control backpressure behavior.
Huuuuge -1 to unlimited being a good default though.
I still don't understand the difference between callbackFlow vs channelFlow - the only difference seems to be the noinline keyword. Why would I use one vs the other?
Huuuuge -1 to unlimited being a good default though.
What makes BUFFERED a better default? I see RENDEZVOUS being on one extreme and UNLIMITED being on the other - but why have BUFFERED as a middle ground?
In this specific case, you probably want conflated. If someone misses a value change of a preference, they likely just want to get the latest value, not all of the value changes that occurred since the last one.
But in general, unlimited buffers are how you get cascading failures rather than localized ones (or potentially none at all). Unlimited queues are very hard to recover from when the producer is faster than the consumer because there's neither backpressure being applied nor points where you can shed load (because how would you even know?).
Especially in the design of Flow, backpressure is handled naturally and arguably transparently inside the system such that producers can't outrun consumers. At the points where you bridge in and out of the suspension world, there's still signals like the return value of offer or runBlocking and actual caller-blocking which you can use to try and slow (or stop) your producer. Too-small or too-large buffers are things you can tweak over time based on actual usage patterns, but every unlimited buffer is just a time-bomb for a crash and/or outage.
@ZakTaccardi There is no different between callbackFlow and channelFlow. They are the same. They just have different names to tailor their docs the the speicific use-case and to enable the code using them to convey your intention. E.g., when I read your code I would immediately see what you are planning to do just by the name of the flow builder you are using.
Most helpful comment
In this specific case, you probably want conflated. If someone misses a value change of a preference, they likely just want to get the latest value, not all of the value changes that occurred since the last one.
But in general, unlimited buffers are how you get cascading failures rather than localized ones (or potentially none at all). Unlimited queues are very hard to recover from when the producer is faster than the consumer because there's neither backpressure being applied nor points where you can shed load (because how would you even know?).
Especially in the design of Flow, backpressure is handled naturally and arguably transparently inside the system such that producers can't outrun consumers. At the points where you bridge in and out of the suspension world, there's still signals like the return value of
offerorrunBlockingand actual caller-blocking which you can use to try and slow (or stop) your producer. Too-small or too-large buffers are things you can tweak over time based on actual usage patterns, but every unlimited buffer is just a time-bomb for a crash and/or outage.