You need to release the msg that is passed into channelRead(...)

> Am 06.07.2017 um 00:52 schrieb [email protected]:
> 
> I have a very bare bones version of netty and kafka using netty4 and 
> kafka-clients_0.10.1.1. 
> I am hitting memory leak issues when I load testing my server. 
> 
> https://github.com/dhawangayash/netty-kafka-producer/tree/master/netty
> 
> Here's how I run the Netty server. 
> java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced 
> -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG 
> -Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar 
> target/netty4-httpserver-0.0.1-jar-with-dependencies.jar 
> com.netty.httpserver.HttpServerNetty
> 
> When I add the advanced leak detection I am seeing my worker threads are 
> leaking memory when publishing messages to kafka. (I noticed this to increase 
> when my Kafka broker applies backpressure.) Here's the code snippet of 
> interest.
> 
> public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
> Exception {
>     if (msg instanceof HttpRequest) {
>         HttpRequest req = (HttpRequest) msg;
>         FullHttpRequest fReq = (FullHttpRequest) req;
>         Charset utf8 = CharsetUtil.UTF_8;
>         ByteBuf buf = fReq.content();
>         String in = buf.toString(utf8);
>         buf.clear();
> 
>         postToKafka.write2Kafka(in);
> 
>         if (HttpHeaders.is100ContinueExpected(req)) {
>             ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
> HttpResponseStatus.CONTINUE));
>         }
>         boolean keepAlive = HttpHeaders.isKeepAlive(req);
>         FullHttpResponse response = new 
> DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
>                 Unpooled.wrappedBuffer(RESP.getBytes()));
>         response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
>         response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, 
> "*");
>         response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
> response.content().readableBytes());
>         if (!keepAlive) {
>             
> ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
>         } else {
>             response.headers().set(CONNECTION, Values.KEEP_ALIVE);
>             ctx.writeAndFlush(response);
>         }
>         LOG.debug(response.toString());
>     }
> }
> 
> 
> Exception stack trace. 
> 
> [nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - 
> DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: 
> UnpooledHeapByteBuf(freed))
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 
> byte(s) of direct memory (used: 253426695, max: 253427712)
>         at 
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
>         at 
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
>         at 
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
>         at 
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
>         at 
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
>         at 
> io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
>         at 
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
>         at 
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>         at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         at java.lang.Thread.run(Thread.java:748)
> 
> This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this 
> message:
> 
> [nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: 
> ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> WARNING: 3 leak records were discarded because the leak record count is 
> limited to 4. Use system property io.netty.leakDetection.maxRecords to 
> increase the limit.
> Recent access records: 5
> #5:
>         
> io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
>         io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
>         io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
>         
> io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
>         io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
>         io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
>         io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
>         io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
>         
> com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
>         
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
>         
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         java.lang.Thread.run(Thread.java:748)
> 
> 
> Created at:
>         
> io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
>         
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
>         
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
>         
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
>         
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
>         
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>         
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         java.lang.Thread.run(Thread.java:748)
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/1CFD0420-D5D4-4600-9969-B594D9BED03F%40googlemail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to