What do we need to do if we need to send or receive large amount of data? If there is a slow consumer, we need to be able to pause / resume reading/writing as necessary (ChannelConfig.setAutoRead()). I don't see any code in ktor that does anything like that.
Let's wait for a proper reply from soneone that knows the internals in detail, since maybe I'm wrong on this,
but as far as I understand backpressure happens in reactive code,
when you are either writting too much, or generating too much reading events that cannot be processed fast enough.
As far as I understand kotlinx.io streams are not reactive, but read on demand. So you read a chunk suspending, then write that chunk suspending
until written or buffered, and do not try to read another chunk before that's done.
That's what usually pipes do, but should work transparently with imperative-like suspendable functions.
I would expect that either tcp, the operating system or kotlinx.io "autopauses" the stream once the internal buffers for that connection are filled until consumed.
With a reactive approach, you have a callback function, and that callback can write somewhere else but you can't tell the producer, hey, "wait until I finish writting this".
As I said this is what I understand, but not an official answer, so maybe I'm wrong :) Let's wait for confirmation or proper explanation.
There is nothing need to be done to get backpressure working on your side: all backpressure control is done inside of ktor so user's coroutine will simply suspend if rx/tx speed is too low at read/write suspend functions. So there is no need to have pause/resume in ktor API.
Consider the following example:
post("/url") {
val form = call.receiveParameters() // suspension here
call.respondText(".....") // suspension here
}
Here we have two suspension points. Inside of receiveParameters and respondText there are suspendable copying loops. Internally they do context.read() and context.writeAndFlush() with listener but you don't need to worry about it
Thanks! How about when you are trying to write? How would it know that you are writing too fast and suspend your call?
We are suspending until last write future (netty's future) will succeed. But we don't do it every write op otherwise it will be too slow. See NettyResponsePipeline.kt.
How about streaming a big file, I have some code like this:
val chunks: ReceiveChannel<ByteArray> = /* A channel from upstream service */
call.respondOutputStream(ContentType.Application.OctetStream, HttpStatusCode.OK) {
for(chunk in chunks) {
// TODO How to handle backpressure here, seems write() is a blocking operation?
write(chunk)
}
}
@xiaodongw you can check a sample about streaming content here: https://github.com/ktorio/ktor-samples/blob/master/other/reverse-proxy/src/ReverseProxyApplication.kt#L69
Most helpful comment
There is nothing need to be done to get backpressure working on your side: all backpressure control is done inside of ktor so user's coroutine will simply suspend if rx/tx speed is too low at read/write suspend functions. So there is no need to have pause/resume in ktor API.
Consider the following example:
Here we have two suspension points. Inside of
receiveParametersandrespondTextthere are suspendable copying loops. Internally they docontext.read()andcontext.writeAndFlush()with listener but you don't need to worry about it