Micronaut-core: Large http client download file with possible leak & Direct buffer memory

Created on 7 Jul 2020  Â·  7Comments  Â·  Source: micronaut-projects/micronaut-core

Steps to Reproduce

  1. clone https://github.com/kestra-io/task-fs
  2. create a big file > 4Go : fallocate -l 5G test.img
  3. start a simple webserver python3 -m http.server 8000
  4. go to org.kestra.task.fs.http.DownloadTest test
  5. change url to public static final String FILE = "http://0.0.0.0:8000/test.img";
  6. launch the test on this class
  7. look at temporary file created : ls -l /tmp/download_7326042757437030257.img that will always be exactly at 4110352384 bytes (3,9G)

You will see these errors :

2020-07-07 21:42:42,179 ERROR oopGroup-3-2 LeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:834)
2020-07-07 21:42:42,330 ERROR oopGroup-3-2 LeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
    io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
    io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:292)
    io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "main" java.lang.OutOfMemoryError: Direct buffer memory
    at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
    at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
    at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
    at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

Maybe I do something wrong on the class org.kestra.task.fs.http.Download :

            Long size = client
                .exchangeStream(request)
                .map(response -> {
                    if (builder.code == null) {
                        builder
                            .code(response.code())
                            .headers(response.getHeaders().asMap());
                    }

                    if (response.getBody().isPresent()) {
                        byte[] bytes = response.getBody().get().toByteArray();
                        output.write(bytes);

                        return (long) bytes.length;
                    } else {
                        return 0L;
                    }
                })
                .reduce(Long::sum)
                .blockingGet();

But I couldn't catch it for now, since I consumed every http chunk and write simply to a temporary files.

bug

Most helpful comment

@jameskleeh any update on this ?

All 7 comments

You cannot store a 4gb file into memory, which is what you're asking the client to do here. Use the dataStream method

Thanks @jameskleeh for the response.
But I've exactly the same behaviour with dataStream method.

 ludo  /  tmp  test  ls -ll  ../download_*
-rw-rw-r-- 1 ludo ludo 4110352384 juil.  8 07:24 ../download_15179365780414817250.img > with exchangeStream
-rw-rw-r-- 1 ludo ludo 4110352384 juil.  7 21:42 ../download_7326042757437030257.img  > with dataStream

And the same exception & netty leaks on the console are raise.

Extracted code :

            Long size = client
                .dataStream(request)
                .map(body -> {
                    byte[] bytes = body.toByteArray();
                    output.write(body.toByteArray());

                    return (long) bytes.length;
                })
                .reduce(Long::sum)
                .blockingGet();

Looking at dataStream method, seems to be a wrapper around exchangeStream
Anything wrong here ?

Thanks for help :+1:

@jameskleeh can you consider to reopen this ?

Please update the sample app with the usage of dataStream if you haven't already

I've update the app on branch datastream using dataStream instead of exchangeStream

Thanks ;)

@jameskleeh any update on this ?

sorry for delay testing this, need to upgrade to micronaut 2.2!

thanks @jameskleeh, works perfectly !
also works with exchangeStream FYI

Was this page helpful?
0 / 5 - 0 ratings