I found myself often doing this flow.toList().last() usually for code that works with Flow which represents progress, when I want to receive the final item, it would be complimentary to first()/single() item
It also can be replaced with something like
.filterInInstance<SomeTerminalItemType>()
.single()
but it's not always so simple, for example, a sealed hierarchy may have a few terminal types for operation
Reactive libraries have such operator http://reactivex.io/documentation/operators/last.html
Flow which represents progress
Are you considered a StateFlow?
@fvasco How state flow would help with it?
I need the final value of flow, not the current value
Interesting. Can you elaborate, please, on the kind of case where you ended up needing it? What kind of flow is that?
Any kind of progress where the final item of flow is some result item
The last case where I found it was a Flow which represents CI Job states, like in queue, executing, finished and error, so in some cases I use it like runCiJob().last(), this case where I have 2 terminal states, finished and error, so I cannot use filterInInstance()
I also had similar cases where I had a Flow which represents some file processing (we have a lot of them), it emits progress and the final item of this flow emits resulting processed file, so often consumer just wants to startFlow and receive final result
Hi, @gildor,
you need a Flow with a state, strictly speaking on this you case, you should prefer a Deferred (or a CompletableStateFlow, which looks very ugly).
I am not suggesting you to use a Deferred instead of Flow, I am considering that a deferred task should put its result in a Deferred.
@fvasco I'm really don't understand your suggestion. Yes, indeed I can use Deferred, but it works if I care only about the final result, but it's not the case, I still may need intermediate states to do some side effect: log, show progress, update status etc
I am considering that a deferred task should put its result in a Deferred
Even this doesn't make sense for me, I don't see how deferred makes it better.
It's just about convenience, not the fact that something cannot be implemented without last().
Usually now instead of exposing one Flow for API, I introduce 2 method: 1 with flow with all intermediate states and 1 suspend function with the final result, it works fine for very often used apis, but sometimes I just want to receive the final result of existing Flow API
Hi, @gildor,
sorry for my words, I try to rephrase better.
Yes, indeed I can use Deferred, but it works if I care only about the final result, but it's not the case
Yes, indeed.
I try to explain my concern, these are not strictly related to the last() operator.
I am considering your use case because it looks a common one.
In you example, you wrote: runCiJob().last().
A first api specification can be fun runCiJob(): Job, but it allows only three states and it is not possible to get updates.
A second proposal can be fun runCiJob(): Flow<State>, so it may return a hot Flow. If this Flow does not own a current state, each invocation should await the next emission.
In a corner case, if a coroutine run runCiJob() and it resumes after the task was completed, then the last event was already sent and the Flow was closed (there isn't anymore the last event in the Flow).
Instead, if a Flow has a current state (third proposal) then the API may be: fun runCiJob(): StateFlow<State>. However a StateFlow cannot be closed, so last() cannot work.
In any case, you defined a custom protocol (in queue, executing, finished and error), so, for your specific use case, a custom function should fix the issue. You already propose that.
suspend fun Flow<State>.await() = first { it.terminal }
We got the same issue, we elaborate a sequence of event and have to get the last one.
Unfortunately, we require a suspend fun inside a filter { }, so we have to switch to Flow, but last is missing.
Here our patch:
public suspend fun <T> Flow<T>.last(): T? = reduce { _, value -> value }
public suspend fun <T> Flow<T>.lastOrNull(): T? = fold<T?, T?>(null) { _, value -> value }
I also have a case where having Flow<T>.last() would be quite useful. I have a Helper class in my android project that produces unique numbers with the help of DataStore. Currently, I'm using flow.conflate().first() in an attempt to get latest value without providing an action block, which appears to work fine. For some reason, fvasco patch didn't work for me
Here's a snippet for brevity:
//[...]
fun getNextRuntimeUniqueInt(): Flow<Int?> {
return dataStore.data.mapLatest { prefs ->
prefs[Keys.NEXT_RUNTIME_UNIQUE_INT]
}
}
//[...]
val cnt = dataStoreHelper.getNextRuntimeUniqueLong().conflate().first()
full context:
DataStoreHelper.kt
class DataStoreHelper(val context: Context) {
private val dataStore: DataStore<Preferences> = context.createDataStore(
name = "preferences"
)
fun getNextRuntimeUniqueInt(): Flow<Int?> {
return dataStore.data.mapLatest { prefs ->
prefs[Keys.NEXT_RUNTIME_UNIQUE_INT]
}
}
suspend fun setNextRuntimeUniqueInt(count: Int) {
dataStore.edit { prefs ->
prefs[Keys.NEXT_RUNTIME_UNIQUE_INT] = count
}
}
fun getNextRuntimeUniqueLong(): Flow<Long?> {
return dataStore.data.mapLatest { prefs ->
prefs[Keys.NEXT_RUNTIME_UNIQUE_LONG]
}
}
suspend fun setNextRuntimeUniqueLong(count: Long) {
dataStore.edit { prefs ->
prefs[Keys.NEXT_RUNTIME_UNIQUE_LONG] = count
}
}
}
UniqueRuntimeNumberHelper.kt
class UniqueRuntimeNumberHelper(val context: Context) {
companion object {
const val INITIAL_NEXT_INT = 1
const val INITIAL_NEXT_LONG = 1L
}
private val dataStoreHelper: DataStoreHelper = (context.applicationContext as App).kodein.instance()
/**
* Get a unique Integer, should only be used for runtime uniqueness
*/
suspend fun nextInt(): Int {
val cnt = dataStoreHelper.getNextRuntimeUniqueInt().conflate().first()
val intCounter = if (cnt != null) AtomicInteger(cnt) else AtomicInteger(INITIAL_NEXT_INT)
val value = intCounter.incrementAndGet()
return when {
(value < Int.MAX_VALUE) -> {
dataStoreHelper.setNextRuntimeUniqueInt(value)
value
}
else -> {
dataStoreHelper.setNextRuntimeUniqueInt(INITIAL_NEXT_INT)
INITIAL_NEXT_INT
}
}
}
/**
* Get a unique Long, should only be used for runtime uniqueness
*/
suspend fun nextLong(): Long {
val cnt = dataStoreHelper.getNextRuntimeUniqueLong().conflate().first()
val longCounter = if (cnt != null) AtomicLong(cnt) else AtomicLong(INITIAL_NEXT_LONG)
val value = longCounter.incrementAndGet()
return when {
(value < Long.MAX_VALUE) -> {
dataStoreHelper.setNextRuntimeUniqueLong(value)
value
}
else -> {
dataStoreHelper.setNextRuntimeUniqueLong(INITIAL_NEXT_LONG)
INITIAL_NEXT_LONG
}
}
}
}
Hi @lmj0011,
the last operator is supposed to return the last emitted value of a Flow.
Similar to: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/last.html