@Norman Maurer That fixed the problem.
Please find updated code below:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
FullHttpRequest fReq = (FullHttpRequest) req;
Charset utf8 = CharsetUtil.UTF_8;
ByteBuf buf = fReq.content();
try {
* String in = buf.toString(utf8);*
* buf.clear();*
...
} finally {
* buf.release();*
}
}
}
}
On Sunday, July 9, 2017 at 1:47:52 AM UTC-7, Norman Maurer wrote:
>
> You need to release the msg that is passed into channelRead(...)
>
> Am 06.07.2017 um 00:52 schrieb [email protected] <javascript:>:
>
> I have a very bare bones version of netty and kafka using netty4 and
> kafka-clients_0.10.1.1.
> I am hitting memory leak issues when I load testing my server.
>
> https://github.com/dhawangayash/netty-kafka-producer/tree/master/netty
>
> Here's how I run the Netty server.
> java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced
> -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG
> -Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar
> target/netty4-httpserver-0.0.1-jar-with-dependencies.jar
> com.netty.httpserver.HttpServerNetty
>
> When I add the advanced leak detection I am seeing my worker threads are
> leaking memory when publishing messages to kafka. (I noticed this to
> increase when my Kafka broker applies backpressure.) Here's the code
> snippet of interest.
>
> public void channelRead(ChannelHandlerContext ctx, Object msg) throws
> Exception {
> if (msg instanceof HttpRequest) {
> HttpRequest req = (HttpRequest) msg;
> FullHttpRequest fReq = (FullHttpRequest) req;
> Charset utf8 = CharsetUtil.UTF_8;
> ByteBuf buf = fReq.content();
> String in = buf.toString(utf8);
> buf.clear();
>
> postToKafka.write2Kafka(in);
>
> if (HttpHeaders.is100ContinueExpected(req)) {
> ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
> HttpResponseStatus.CONTINUE));
> }
> boolean keepAlive = HttpHeaders.isKeepAlive(req);
> FullHttpResponse response = new
> DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
> Unpooled.wrappedBuffer(RESP.getBytes()));
> response.headers().set(HttpHeaders.Names.CONTENT_TYPE,
> "text/plain");
>
> response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
> response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
> response.content().readableBytes());
> if (!keepAlive) {
>
> ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
> } else {
> response.headers().set(CONNECTION, Values.KEEP_ALIVE);
> ctx.writeAndFlush(response);
> }
> LOG.debug(response.toString());
> }
> }
>
>
> Exception stack trace.
>
> [nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler -
> DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content:
> UnpooledHeapByteBuf(freed))
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024
> byte(s) of direct memory (used: 253426695, max: 253427712)
> at
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
> at
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
> at
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
> at
> io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
> at
> io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
> at
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
> at
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
> at
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
> at
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
> at
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> at java.lang.Thread.run(Thread.java:748)
>
> This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this
> message:
>
> [nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK:
> ByteBuf.release() was not called before it's garbage-collected. See
> http://netty.io/wiki/reference-counted-objects.html for more information.
> WARNING: 3 leak records were discarded because the leak record count is
> limited to 4. Use system property io.netty.leakDetection.maxRecords to
> increase the limit.
> Recent access records: 5
> #5:
>
> io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
>
> io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
> io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
>
> io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
>
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
> io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
> io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
> io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
>
> com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> java.lang.Thread.run(Thread.java:748)
>
>
> Created at:
>
> io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
>
> io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
>
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
>
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
>
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
>
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> java.lang.Thread.run(Thread.java:748)
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected] <javascript:>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com
>
> <https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
--
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/9c3769bc-4ce2-47d3-9de5-10c33f05bf1d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.