@Norman Maurer That fixed the problem.

Please find updated code below:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;
            FullHttpRequest fReq = (FullHttpRequest) req;
            Charset utf8 = CharsetUtil.UTF_8;
            ByteBuf buf = fReq.content();
            try {
*                String in = buf.toString(utf8);*
*                buf.clear();*
                ...
            } finally {
*                buf.release();*
            }
        }
    }
}



On Sunday, July 9, 2017 at 1:47:52 AM UTC-7, Norman Maurer wrote:
>
> You need to release the msg that is passed into channelRead(...)
>
> Am 06.07.2017 um 00:52 schrieb [email protected] <javascript:>:
>
> I have a very bare bones version of netty and kafka using netty4 and 
> kafka-clients_0.10.1.1. 
> I am hitting memory leak issues when I load testing my server. 
>
> https://github.com/dhawangayash/netty-kafka-producer/tree/master/netty
>
> Here's how I run the Netty server. 
> java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced 
> -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG 
> -Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar 
> target/netty4-httpserver-0.0.1-jar-with-dependencies.jar 
> com.netty.httpserver.HttpServerNetty
>
> When I add the advanced leak detection I am seeing my worker threads are 
> leaking memory when publishing messages to kafka. (I noticed this to 
> increase when my Kafka broker applies backpressure.) Here's the code 
> snippet of interest.
>
> public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
> Exception {
>     if (msg instanceof HttpRequest) {
>         HttpRequest req = (HttpRequest) msg;
>         FullHttpRequest fReq = (FullHttpRequest) req;
>         Charset utf8 = CharsetUtil.UTF_8;
>         ByteBuf buf = fReq.content();
>         String in = buf.toString(utf8);
>         buf.clear();
>
>         postToKafka.write2Kafka(in);
>
>         if (HttpHeaders.is100ContinueExpected(req)) {
>             ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
> HttpResponseStatus.CONTINUE));
>         }
>         boolean keepAlive = HttpHeaders.isKeepAlive(req);
>         FullHttpResponse response = new 
> DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
>                 Unpooled.wrappedBuffer(RESP.getBytes()));
>         response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
> "text/plain");
>         
> response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
>         response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
> response.content().readableBytes());
>         if (!keepAlive) {
>             
> ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
>         } else {
>             response.headers().set(CONNECTION, Values.KEEP_ALIVE);
>             ctx.writeAndFlush(response);
>         }
>         LOG.debug(response.toString());
>     }
> }
>
>
> Exception stack trace. 
>
> [nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - 
> DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: 
> UnpooledHeapByteBuf(freed))
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 
> byte(s) of direct memory (used: 253426695, max: 253427712)
>         at 
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
>         at 
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
>         at 
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
>         at 
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
>         at 
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
>         at 
> io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
>         at 
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
>         at 
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
>         at 
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>         at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         at java.lang.Thread.run(Thread.java:748)
>
> This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this 
> message:
>
> [nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: 
> ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> WARNING: 3 leak records were discarded because the leak record count is 
> limited to 4. Use system property io.netty.leakDetection.maxRecords to 
> increase the limit.
> Recent access records: 5
> #5:
>         
> io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
>         
> io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
>         io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
>         
> io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
>         
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
>         io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
>         io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
>         io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
>         
> com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
>         
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>         
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>         
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>         
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
>         
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         java.lang.Thread.run(Thread.java:748)
>
>
> Created at:
>         
> io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
>         
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
>         
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
>         
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
>         
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
>         
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>         
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>         
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>         io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>         
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>         java.lang.Thread.run(Thread.java:748)
>
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] <javascript:>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com
>  
> <https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/9c3769bc-4ce2-47d3-9de5-10c33f05bf1d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to