Using ktor-server-netty 0.9.4
kotlin 1.2.51 on jdk8
We wrote a little http stub in kotlin using ktor that helps us measuring the performances of some of our microservices.
The stub answers to POST and GET and returns the same body all the time.
I've been hitting it right now with 100 concurrent cyclical requests (Jmeter) and I'm getting this error in the logs:
2018-09-11 09:49:45.146 [nettyWorkerPool-3-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
2018-09-11 09:49:56.964 [nettyWorkerPool-3-3] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html ffun run(stubArgs: StubArgs) {or more information.
Recent access records:
Created at:
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:305)
io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:101)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
Whenever the error appears, after an unpredictable amount of time the stub is going to be choked and not respond to requests anymore (or intermittently)
This is the main stub code. As you can see we are using Kotlin's coroutines. Not sure if the problem is related to that :
private fun run() {
...
val server = embeddedServer(Netty, 8080) {
install(CORS) { anyHost() }
install(CallLogging)
install(DefaultHeaders)
install(Routing) {
get() { call.respondedText(delayedResponse(call.response, returnDelay, returnBody)) }
post() { call.respondedText(delayedResponse(call.response, returnDelay, returnBody)) }
}
}
server.start()
}
private suspend fun delayedResponse(response, returnDelay, returnBody) {
delay(returnDelay)
return returnBody
}
fun main(args) {
runBlocking {
run()
}
}
Worth mentioning that the problem is seen only when we send a POST request storm (body is about 8kilobytes of XML). It is not seen during GET request storm.
Any advice on what is happening?
it looks like some kind of memory leak. The stub is deployed in a container and i'm seeing MEM USAGE raising almost linearly with time during the performance run. (CPU time is about 11% steady) using docker stats
I confirm leak: it happens because you don't consume request body. So you need to do call.receive
Thank you, that worked beautifully.
I don't see the error anymore in the logs when i do POSTs.
I see still some strange behaviour: he memory allocation is still raising with time, but it is slower now. If i set a memory limit in Docker (e.g. 1024mb) and the container reaches it, we see a big increment in latency and a big degradation of throughput.
However no HTTP errors at all anymore.
On GETs, there is no memory increment at all.
How do you receive the content?
i use this:
val body: ByteArray = call.receive<InputStream>().readBytes()
both in GET and POST contexts, even if for GET it does not matter.
You are receiving the whole content to byte array so if there are a lot of requests running concurrently then you can run out of heap
Also note that InputStream is blocking so you are holding a thread
Uhm in the past I tried .receiveText() but I hit a different wall. Will try again as soon as I am back to work. It's fine for us that IS is blocking, we are building a stub after all.
I am not grasping the comment about runnig out of heap. In principle if I hit ktor continously with 100 concurrent requests with a performance tool, it will hold at any time up to max 100 threads, right? In my limited understanding of the memory model I believe that the memory occupation should reach a steady level and not be growing.
A little update, unfortunately not very useful. I tried to consume with call.receiveText() instead of call.receive<InputStream>(), butd I get so many errors during the performances. As far as I remember this is also the reason why we were using InputStream receiver to begin with -- guess I'm doing something wrong somewhere.
Fixed
Most helpful comment
I confirm leak: it happens because you don't consume request body. So you need to do
call.receive