Kotlinx.coroutines: Provide atomic way to transform MutableStateFlow's state

Created on 16 Nov 2020  路  10Comments  路  Source: Kotlin/kotlinx.coroutines

Some state updates require to know the current state. A simple example is a StateFlow<List<T>> which we want to maintain by subscribing to created/deleted/modified events about these T objects.
For each event, we need to read the existing List in the state, and set the state to the new modified list.

My current use case is the following (uses a map, but the idea is the same):

suspend fun ChromeBrowserSession.watchTargetsIn(coroutineScope: CoroutineScope): StateFlow<Map<TargetID, TargetInfo>> {
    val targetsFlow = MutableStateFlow(emptyMap<TargetID, TargetInfo>())

    target.events().onEach { targetsFlow.value = targetsFlow.value.updatedBy(it) }.launchIn(coroutineScope)

    // triggers target info events
    target.setDiscoverTargets(SetDiscoverTargetsRequest(discover = true))
    return targetsFlow
}

private fun Map<TargetID, TargetInfo>.updatedBy(event: TargetEvent): Map<TargetID, TargetInfo> = when (event) {
    is TargetEvent.TargetCreatedEvent -> this + (event.targetInfo.targetId to event.targetInfo)
    is TargetEvent.TargetInfoChangedEvent -> this + (event.targetInfo.targetId to event.targetInfo)
    is TargetEvent.TargetDestroyedEvent -> this - event.targetId
    is TargetEvent.TargetCrashedEvent -> this - event.targetId
}

This is all fine, but sometimes we may want the MutableStateFlow to be accessed concurrently from multiple places.
I used to have separate subscription methods for different kinds of events, and had to use 4 coroutines to subscribe to each kind until I created a different API for all events().
In that case, I had to externally synchronize operations to ensure that I would read and write the state atomically, value = value.updatedBy(event) doesn't really work on its own.

There is already compareAndSet which provides this kind of guarantees, but I wish there were a transform method like:

/** Atomically sets the value of this `MutableStateFlow` by applying the given [transform] on the current state. */
fun <T> MutableStateFlow<T>.transformValue(transform: (currentState: T) -> T)

Is there already something like this? Am I missing something here?

question

All 10 comments

Yes, there is a compareAndSet atomic operation which, being a universal operation, can be used to implement an arbitrary pure transfromation. Does it help in your case?

AFAIU, compareAndSet will simply compare the current state with a state that I need to provide, and set the state to whichever other value I provide if there is a match.
It seems it would be quite inefficient to implement transformState in terms of compareAndSet. Unless I missed something, this is what it would look like:

fun <T> MutableStateFlow<T>.transformValue(transform: (currentState: T) -> T) {
    var currentState: T
    do {
        currentState = value
        val newState = transform(currentState)
    } while (!compareAndSet(currentState, newState))
}

This has significant drawbacks:

  1. each try would compare the whole state (in my case the whole list/map, which is quite inefficient)
  2. each try calls the transform, which could be anything (and may be expensive)
  3. it needs to actively loop until there is no race condition

That's why I believe another primitive might be needed here. Do you see an efficient alternative to the above code?

@joffrey-bion it seems that what you need is mutual exclusion. I would recommend guarding state update with a Mutex in that case

@qwwdfsad yes, that's what I meant by:

In that case, I had to externally synchronize operations to ensure that I would read and write the state atomically, value = value.updatedBy(event) doesn't really work on its own.

It just seemed to me that MutableStateFlow was meant to be used concurrently, and thus remove the hassle of synchronization from the user. This is (I guess) why compareAndSet was provided, and that's why I wondered whether you would consider supporting transformAndSet() out of the box as well, which is a more powerful low level primitive.

compareAndSet is here to provide an additional degree of flexibility. Mostly it's done for users that want diistintUntilChanged operator with custom comparator applied. We decided, that compareAndSet is a more powerful, flexible and have lesser footprint than additional comparator argument in all constructor functions.

It was not meant to be used as a low-level synchronization primitive. For the same reasoning, we won't introduce transformAndSet, which also implies a lot of low-level issues with its implementation: what to do if it prevents the progress of the whole flow, how to ensure that it's not going to be abused by application-level usage etc.

@qwwdfsad Thanks for the explanation. I thought using compareAndSet as a low-level primitive was what @elizarov suggested here, hence my reply about the inefficiency of using it for the purpose of atomic transformation. It does work for simple things like increment, which I have just noticed is implemented similarly to the code I provided above, but indeed it doesn't seem suitable for complex states.

Mostly it's done for users that want diistintUntilChanged operator with custom comparator applied. We decided, that compareAndSet is a more powerful, flexible and have lesser footprint than additional comparator argument in all constructor functions.

This is just curiosity now, but could you please expand a bit on this? I didn't see distinctUntilChanged use compareAndSet, and compareAndSet doesn't seem to support a custom comparator, so I think I'm missing the point.

Also, I'm not sure I understand the concerns you're mentioning for transformAndSet here. I was in the mindset of using MutableStateFlow as the source of the flow so I didn't believe it could prevent the progress of any existing flow. But maybe you're talking about custom operators on Flow that would create special implementation of StateFlows out of it?

Regarding application-level abuse, that's actually what I wanted to prevent by asking for a sane primitive instead of users building a world around the updates of a MutableStateFlow. The absence of it currently leads to using Mutex, or single-threaded coroutine dispatchers, or moving things to a single coroutine acting on the flow, or the crazy code I provided above etc.
But I'm probably missing a whole bunch of other issues that it would create to support transformAndSet(), so I trust your word on this.

This is just curiosity now, but could you please expand a bit on this? I didn't see distinctUntilChanged use compareAndSet, and compareAndSet doesn't seem to support a custom comparator, so I think I'm missing the point.

Sure. They are not used internally, but you can emulate it using these primitives: you'll have to use custom comparator outside of the CAS and rely on the value identity during CAS. Additionally, CAS is useful for things like atomic increments and emulating atomic integers like a Flow.

Also, I'm not sure I understand the concerns you're mentioning for transformAndSet here.

I was thinking about transform block being invoked exclusively and always successfully if the expected value matches the current one. Otherwise, it wouldn't be really different from the implementation you proposed here

If exclusivity is not an issue, then your implementation should suffice

Thanks for the explanation.

I was thinking about transform block being invoked exclusively and always successfully if the expected value matches the current one. Otherwise, it wouldn't be really different from the implementation you proposed here.
If exclusivity is not an issue, then your implementation should suffice

I think we are thinking the same thing, but I probably don't express myself properly in the correct terms. In my initial idea of transformAndSet there would be no "expected" value at all (at least not from the user's point of view).

The goal I was aiming at here is just ensuring that concurrent transformations are executed one after the other, so yes transform should have exclusive access to the state in order to read-transform-write atomically. The order wouldn't matter, hence no real notion of expected value at all, as the user is given the current state value through transform's parameter.

I guess what I'm looking for is simply:

fun <T> MutableStateFlow<T>.transformAndSet(transform: (T) -> T) {
    synchronized(this) {
        value = transform(value)
    }
}

But maybe that's too na茂ve? Being library code, interactions with compareAndSet could also be handled. Here I'm using the same lock synchronized(this), but it's just because I happen to know the implementation of compareAndSet, which could change. If compareAndSet uses another lock someday, my code wouldn't work if called concurrently with compareAndSet.

Some context: StateFlow and SharedFlow are using synchornized(this) as an implementation detail. Synchronization there is fast and fine-grained and no coroutines/collectors are ever resumed under the lock. This is just more faster/simpler alternative to lock-free implementation (taking into account all the constraints).

Now the problem with transformAndSet is that now we expose the primitive that invokes a user-supplied arbitrary transform that is executed under the lock. And this lambda can block indefinitely, be really slow, execute disk IO etc. All of that while holding the lock (-> preventing the progress of all emitters and collectors). While it may work for you, because you may be aware of these limitations, such API exposed as a general-purpose one is just a time-bomb waiting to explode.
So I'd suggest you to stick with your extension method, but it's unlikely that we want incorporate it in the default API

Very clear explanation, thanks a lot. Yes, this implementation has problems, that's why I was thinking that having something smarter in the library would prevent wrong user-implementations like this. But I guess there is no simple solution to this, so I guess I understand why you prefer not to provide anything rather than providing some risky primitive.

Again thanks for your time explaining this. Please feel free to close this issue.

Was this page helpful?
0 / 5 - 0 ratings