fallocate -l 5G test.imgpython3 -m http.server 8000org.kestra.task.fs.http.DownloadTest test public static final String FILE = "http://0.0.0.0:8000/test.img";ls -l /tmp/download_7326042757437030257.img that will always be exactly at 4110352384 bytes (3,9G)You will see these errors :
2020-07-07 21:42:42,179 ERROR oopGroup-3-2 LeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:834)
2020-07-07 21:42:42,330 ERROR oopGroup-3-2 LeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:292)
io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "main" java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Maybe I do something wrong on the class org.kestra.task.fs.http.Download :
Long size = client
.exchangeStream(request)
.map(response -> {
if (builder.code == null) {
builder
.code(response.code())
.headers(response.getHeaders().asMap());
}
if (response.getBody().isPresent()) {
byte[] bytes = response.getBody().get().toByteArray();
output.write(bytes);
return (long) bytes.length;
} else {
return 0L;
}
})
.reduce(Long::sum)
.blockingGet();
But I couldn't catch it for now, since I consumed every http chunk and write simply to a temporary files.
You cannot store a 4gb file into memory, which is what you're asking the client to do here. Use the dataStream method
Thanks @jameskleeh for the response.
But I've exactly the same behaviour with dataStream method.
ludo  /  tmp  test  ls -ll ../download_*
-rw-rw-r-- 1 ludo ludo 4110352384 juil. 8 07:24 ../download_15179365780414817250.img > with exchangeStream
-rw-rw-r-- 1 ludo ludo 4110352384 juil. 7 21:42 ../download_7326042757437030257.img > with dataStream
And the same exception & netty leaks on the console are raise.
Extracted code :
Long size = client
.dataStream(request)
.map(body -> {
byte[] bytes = body.toByteArray();
output.write(body.toByteArray());
return (long) bytes.length;
})
.reduce(Long::sum)
.blockingGet();
Looking at dataStream method, seems to be a wrapper around exchangeStream
Anything wrong here ?
Thanks for help :+1:
@jameskleeh can you consider to reopen this ?
Please update the sample app with the usage of dataStream if you haven't already
I've update the app on branch datastream using dataStream instead of exchangeStream
Thanks ;)
@jameskleeh any update on this ?
sorry for delay testing this, need to upgrade to micronaut 2.2!
thanks @jameskleeh, works perfectly !
also works with exchangeStream FYI
Most helpful comment
@jameskleeh any update on this ?