I'm facing some weird race condition when wrapping a Flow into a Publisher. Since I was first suspecting the issue on the consumer side (using S3 client of the AWS SDK 2) there is also a ticket in their repo:
https://github.com/aws/aws-sdk-java-v2/issues/953
My flow is created like this:
fun readFromFile(file: Path): Flow<ByteBuffer> = flow {
val channel = AsynchronousFileChannel.open(file)
channel.use {
var filePosition = 0L
while(true) {
val buf = ByteBuffer.allocate(4096)
val bytesRead = it.aRead(buf, filePosition)
if(bytesRead <= 0)
break
filePosition += bytesRead
buf.flip()
// the following delay seems to suppress the race-condition
// delay(10)
emit(buf)
}
}
}
suspend fun AsynchronousFileChannel.aRead(buf: ByteBuffer, position: Long): Int =
suspendCoroutine { cont ->
read(buf, position, Unit, object : CompletionHandler<Int, Unit> {
override fun completed(bytesRead: Int, attachment: Unit) {
cont.resume(bytesRead)
}
override fun failed(exception: Throwable, attachment: Unit) {
cont.resumeWithException(exception)
}
})
}
And consumed like this:
s3client.putObject(PutObjectRequest.builder()
.bucket(s3config.bucket.get())
.key("test")
.contentLength(inputFile.length())
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.build(),
AsyncRequestBody.fromPublisher(readFromFile(inputFile.toPath()).asPublisher()))
However, on the consumer side I can observe that the items of the flow are emitted by multiple threads and even concurrently which results in transmission failure since the ByteBuffers might be written in the wrong order.
Delaying the producing flow seems to suppress the race condition and the ByteBuffers are written in the correct order. Also when first collecting the flow like in the following, it is working:
val chunks = ArrayList<ByteBuffer>()
val c = readFromFile(inputFile.toPath())
c.collect {
chunks.add(it)
}
... and then creating a Publisher out of it
AsyncRequestBody.fromPublisher(Flowable.fromIterable(chunks))
While debugging the S3 client implementation I saw that underlying IO threads (of netty) might invoke Subscription::request while another thread is executing Subscriber::onNext at the same time. Might this be the cause of the misbehavior? Can it somehow happen that under these circumstances multiple threads could run multiple Continuations of the same coroutine in-parallel?
I would highly appreciate any advice.
Out of curiosity, is the issue also mitigated if instead of using delay(...), you use yield()?
@mhernand40 using Thread.yield() and kotlinx.coroutines.yield() instead of delay() does not mitigate the issue. The same race condition will still be observable.
I wrote a stress-test that calls Subscription.request concurrently with Subscriber.onNext and check whether it can lead to any kind of reordering in received elements, but it fails to reproduce this issue. I even tried to send multiple requests concurrently. For the record, here's the code: https://gist.github.com/elizarov/63f6d6eb8542e052cc74091d1faab367
It looks like something else is going on there. Is there any chance that you can create a self-contained code that reproduces this problem?
I've created the following reproducer:
https://github.com/marc-christian-schulze/kotlinx-reproducer-2109
It contains 3 test cases. 2 of them do pass and show that functionally speaking the code seems to be correct. The third test case however, shows the race-condition.
Update: I've found how to fix this problem. Now figuring out what exactly is wrong, what part of the Publisher contract is violated (if any) and how to test for it.
@elizarov I highly appreciate all the hard work you've done! However, it seems it did not cover the full issue. I've updated the reproducer to kotlinx 1.3.8 to validate the fix. At first glance all tests passed but after a couple of executions they failed again. It seems your fix has mitigated the issue so that the failure rate drops from ~ 17/20 to ~3/20 but it did not fully resolve it.
Could you please have a second look?
Sure. I'll take a lot. I have your reproducer code around to check.
@marc-christian-schulze I cannot reproduce it in your test framework. I've added a loop so that it repeatedly loops and uploads a file and it works for thousands of iterations like a charm. How do you manage to still reproduce it? Are you sure that you are running version 1.3.8?
@elizarov now you made me curious whether I was really using 1.3.8. o.O I've therefore added a Dockerfile to the reproducer to run the tests in an isolated reproducible environment. It again reports 2 out of 22 failed tests on my machine. Maybe it's harder to reproduce on machines with different number of cores than my machine has? I was running the example on an AMD Phenom II X6 (with 6 physical cores).
If this the same code or something else? I was trying it under Mac OS on Mac Book. l'll try it on a more powerful machine (albeit running on Windows). Maybe it is something that reproduces only under a Linux scheduler, though.
Same code, just build and run in the container to make sure Gradle is not pulling in any unwanted dependency from the host machine.
I run the 2a3ccda90221677f3038018c75c91aadff51d241 @ github.com:marc-christian-schulze/kotlinx-reproducer-2109.git, I got on AMD A8-3870:
~~~
Starting a Gradle Daemon (subsequent builds will be faster)
Task :clean UP-TO-DATE
Task :compileKotlin
Task :compileJava NO-SOURCE
Task :processResources
Task :classes
Task :inspectClassesForKotlinIC
Task :jar
Task :compileTestKotlin
Task :compileTestJava NO-SOURCE
Task :processTestResources NO-SOURCE
Task :testClasses UP-TO-DATE
Task :test
ExampleResourceTest > repetition 5 of 20 FAILED
java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98
ExampleResourceTest > repetition 7 of 20 FAILED
java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98
ExampleResourceTest > repetition 11 of 20 FAILED
java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98
ExampleResourceTest > repetition 20 of 20 FAILED
java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98
22 tests completed, 4 failed
Task :test FAILED
FAILURE: Build failed with an exception.
What went wrong:
Execution failed for task ':test'.
There were failing tests. See the report at: file:///workspace/build/reports/tests/test/index.html
Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
Get more help at https://help.gradle.org
BUILD FAILED in 3m 12s
~~~
I hope this helps you.
I has something to do with Linux. It reproduces inside docker running on Mac OS for me. I'll check it out.
Update so far:
.flowOn(Dispatchers.Default) between readFromFile(....) and .asPublisher()Dispatchers.Default in _implementation_ of asPublisher instead of Dispatchers.Unconfined.I still do not understand what is going on there, though. Somehow the fact that asFlow internally uses an Unconfined dispatcher causes problem somewhere in the bowels of S3 API code. I see that onNext class jumps from being called from async file threads (named Thread-XX) to being called directly from netty threads (named aws-java-sdk-NettyEventLoop-0-X), yet I cannot identify any Publisher contract violation that could cause any problem to S3 API (all onNext are called sequentially, they are never called before the corresponding request comes). On the other hand, so far I've failed to write a self-contained reproducer that would show that there is some problem with S3 API (it works fine where either all onNext calls are done directly from async file threads or when they are all done from netty threads).
I think I discovered the spot in the S3 api code, cf.
OrderedWriteChannelHandlerContext
The underlying netty ctx is wrapped into the above-mentioned class.
It seems they were trying to somehow sort the incoming tasks (including the writeAndFlush invocation) but do this based on whether the calling thread belongs to the event loop. Since the unconfined dispatchers runs the first coroutine using the calling thread (the netty io thread), the following writeAndFlush will be executed immediately although there might be items in the queue that should go first.
This now looks to me that the S3 api has a bug.
@marc-christian-schulze Thanks for finding the root cause. To confirm it I've created a standalone reproducer that does not use Kotlin Flow at all -- it is a slightly intricate implementation of file-reading publisher. It is a fully compliant Publisher implementation that reads one more block that that is being requested to, but waits until request is called to give it back to the subscriber, thus mixing calls from netty and non-netty threads and causing the same problem: https://github.com/marc-christian-schulze/kotlinx-reproducer-2109/pull/1
I've also created #2155 that will help to work around these kind of problems.
Most helpful comment
Update: I've found how to fix this problem. Now figuring out what exactly is wrong, what part of the
Publishercontract is violated (if any) and how to test for it.