Kotlinx.coroutines: Introduce interop between CoroutineContext and Reactor Context

Created on 13 Mar 2018  路  15Comments  路  Source: Kotlin/kotlinx.coroutines

Reactor Context seems similar to CoroutineContext, so I tend to think it should be interesting to upgrade kotlinx-coroutines-reactor for seamless integration between both.

@elizarov Any thoughts?

enhancement

Most helpful comment

@sdeleuze It makes total sense. They seem to fit conceptually. We can have Context.asCoroutineContext extension, so that you can mix it into the coroutine context, implement ReactorContext: CoroutineContext.Key, so that you can retrieve it back via coroutineContext[ReactorContext], and we can have all the reactor builders automatically extract the rector context from their coroutine context and pass it on. I'm open to PR.

All 15 comments

@sdeleuze It makes total sense. They seem to fit conceptually. We can have Context.asCoroutineContext extension, so that you can mix it into the coroutine context, implement ReactorContext: CoroutineContext.Key, so that you can retrieve it back via coroutineContext[ReactorContext], and we can have all the reactor builders automatically extract the rector context from their coroutine context and pass it on. I'm open to PR.

Hi, any update on the integration of the related commit?

@elizarov, @SokolovaMaria, @qwwdfsad Can you provide an update for this ?
I see that this feature is implemented in PR but still is not merged.

PR is on review, we are hopefully going to merge it to 1.3.0-RC

@qwwdfsad, Sounds good. Thanks for update!

Thanks for this first step, but I think this issue should be reopened for refinements before releasing 1.3 final. Based on my current understanding, with Coroutines 1.3.0-RC what is currently supported for Spring Framework use case is context interop for converting suspending function to Mono.

For Mono to suspending function we are using extensions like suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? or suspend fun <T> Publisher<T>.awaitSingle(): T which are at kotlinx.coroutines.reactive level so unlikely to support context interop. I guess we need to discuss how this should be exposed at API level. What about providing extensions like suspend fun <T> Mono<T>.awaitFirstOrNull(copyContext: Boolean = true): T? at Mono level?

About Flux, I am not sure how to use kotlinx.coroutines.reactor.FluxKt#flux. What we need is Flux to and from Flow interop, we currently use Publisher<T>.asFlow() and Flow<T>.asPublisher(). I suppose we would need something like Flux<T>.asFlow(copyContext: Boolean = true) and Flow<T>.asFlux(copyContext: Boolean = true).

In these example I have set the default value of copyContext to true for the sake of consistency since the kotlinx.coroutines.reactor.MonoKt#mono function does that by default. Maybe kotlinx.coroutines.reactor.MonoKt#mono should have this parameter too. I am open to set this optional parameter to false by default if you think that's a better default (for performances reason for example. But it would be nice to be consistent and provide this optional parameter everywhere where such context copy is performed.

Any thoughts?

@sdeleuze can you, please, show a slightly more complete motivating example so that we have some sample code to verify our proposed solution against.

@elizarov Sure, but you will have to wait next week since I am in vacation without any laptop.

After a bit of discussion, we've decided to provide the following API:

  • All Mono and Publisher extensions such as await* do capture current coroutine context and propagate ReactorContext to the Mono. The indirection is done via ServiceLoader mechanism, all you have to do is to add kotlinx-coroutines-reactor to the classpath.
    For example, in the following snippet
withContext(Context.of("key",  "value").asCoroutineContext()) {
    someMono.await()
}

context will be propagated to mono as long as kotlinx-coroutines-reactor is present in the classpath.

  • Publisher.asFlow propagate context to the underlying flux.
    For example, in the following snippet
val flow = flux.asFlow() 
withContext(Context.of("key",  "value").asCoroutineContext()) {
    flow.collect {
        // ...
    }
}

context is propagated to the original flux

  • New builder, Flow.asFlux that properly propagates SubscriberContext to the flow (using flowOn).

For example, in the following snippet

flow {
    someMono.await()
}.asFlux()..subscriberContext { ctx -> ctx.put("key", "value") }

the context will be propagated to flow body and, transitively, to someMono.

We are planning to merge this change to the upcoming 1.3 this week.

That sounds perfect, please let me know if you need more input from me.

Will this solve the problem that I cannot access the ReactiveSecurityContextHolder.getContext fom a coroutine?

@Test
@DisplayName("it should propagate the security context to coroutine")
fun testContextCoroutine() {
    val ctx = mono {
        getContextCoroutine()
    }
            .subscriberContext { c -> c.putAll(ReactiveSecurityContextHolder.withAuthentication(UsernamePasswordAuthenticationToken("", ""))) }
            .block()
    assertNotNull(ctx)
}
suspend fun getContextCoroutine(): SecurityContext? {
    return ReactiveSecurityContextHolder.getContext().awaitFirstOrNull()
}

Will this solve the problem that I cannot access the ReactiveSecurityContextHolder.getContext fom a coroutine?

@eiswind Yes

I tried the test above against the reactor-context branch and it still fails. I guess I have to do something to propagate the context?

The value is provided in the Reactor Context (in real life this is done by spring security), and I can see the value in the CoroutineContext. But it does seem to get lost on the awaitFirstOrNull.

Bildschirmfoto von 2019-08-05 15-02-50
Bildschirmfoto von 2019-08-05 15-13-22

After some investigation I found that the changes in Await.kt do not seem to be in this branch. So I guess I have to wait until everything is merged.

@eiswind Works for me in reactor-context branch.

Thx for the reply.

.subscriberContext { c -> c.putAll(ReactiveSecurityContextHolder.withAuthentication(UsernamePasswordAuthenticationToken("", ""))) }

seems to be just the wrong way.

.subscriberContext { c -> c.put(SecurityContext::class.java, Mono.just(SecurityContextImpl(UsernamePasswordAuthenticationToken("", "")))) }

did the trick.
Again thanks for the support.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

iTanChi picture iTanChi  路  3Comments

IgorKey picture IgorKey  路  3Comments

Leftwitch picture Leftwitch  路  3Comments

mhernand40 picture mhernand40  路  3Comments

Pitel picture Pitel  路  3Comments