I have a very bare bones version of netty and kafka using netty4 and 
kafka-clients_0.10.1.1. 
I am hitting memory leak issues when I load testing my server. 

https://github.com/dhawangayash/netty-kafka-producer/tree/master/netty

Here's how I run the Netty server. 
java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced 
-Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG 
-Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar 
target/netty4-httpserver-0.0.1-jar-with-dependencies.jar 
com.netty.httpserver.HttpServerNetty

When I add the advanced leak detection I am seeing my worker threads are 
leaking memory when publishing messages to kafka. (I noticed this to 
increase when my Kafka broker applies backpressure.) Here's the code 
snippet of interest.

public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
    if (msg instanceof HttpRequest) {
        HttpRequest req = (HttpRequest) msg;
        FullHttpRequest fReq = (FullHttpRequest) req;
        Charset utf8 = CharsetUtil.UTF_8;
        ByteBuf buf = fReq.content();
        String in = buf.toString(utf8);
        buf.clear();

        postToKafka.write2Kafka(in);

        if (HttpHeaders.is100ContinueExpected(req)) {
            ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.CONTINUE));
        }
        boolean keepAlive = HttpHeaders.isKeepAlive(req);
        FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                Unpooled.wrappedBuffer(RESP.getBytes()));
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
        
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
        if (!keepAlive) {
            
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        } else {
            response.headers().set(CONNECTION, Values.KEEP_ALIVE);
            ctx.writeAndFlush(response);
        }
        LOG.debug(response.toString());
    }
}


Exception stack trace. 

[nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - 
DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: 
UnpooledHeapByteBuf(freed))
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 
byte(s) of direct memory (used: 253426695, max: 253427712)
        at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
        at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
        at 
io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
        at 
io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
        at 
io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
        at 
io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
        at 
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
        at 
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
        at 
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)

This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this 
message:

[nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: 
ByteBuf.release() was not called before it's garbage-collected. See 
http://netty.io/wiki/reference-counted-objects.html for more information.
WARNING: 3 leak records were discarded because the leak record count is 
limited to 4. Use system property io.netty.leakDetection.maxRecords to 
increase the limit.
Recent access records: 5
#5:
        
io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
        io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
        io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
        
io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
        io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
        io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
        io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
        io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
        
com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)


Created at:
        
io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
        
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
        
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
        
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
        
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
        
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
        
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to