You need to release the msg that is passed into channelRead(...) > Am 06.07.2017 um 00:52 schrieb [email protected]: > > I have a very bare bones version of netty and kafka using netty4 and > kafka-clients_0.10.1.1. > I am hitting memory leak issues when I load testing my server. > > https://github.com/dhawangayash/netty-kafka-producer/tree/master/netty > > Here's how I run the Netty server. > java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced > -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG > -Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar > target/netty4-httpserver-0.0.1-jar-with-dependencies.jar > com.netty.httpserver.HttpServerNetty > > When I add the advanced leak detection I am seeing my worker threads are > leaking memory when publishing messages to kafka. (I noticed this to increase > when my Kafka broker applies backpressure.) Here's the code snippet of > interest. > > public void channelRead(ChannelHandlerContext ctx, Object msg) throws > Exception { > if (msg instanceof HttpRequest) { > HttpRequest req = (HttpRequest) msg; > FullHttpRequest fReq = (FullHttpRequest) req; > Charset utf8 = CharsetUtil.UTF_8; > ByteBuf buf = fReq.content(); > String in = buf.toString(utf8); > buf.clear(); > > postToKafka.write2Kafka(in); > > if (HttpHeaders.is100ContinueExpected(req)) { > ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, > HttpResponseStatus.CONTINUE)); > } > boolean keepAlive = HttpHeaders.isKeepAlive(req); > FullHttpResponse response = new > DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, > Unpooled.wrappedBuffer(RESP.getBytes())); > response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); > response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, > "*"); > response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, > response.content().readableBytes()); > if (!keepAlive) { > > ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); > } else { > response.headers().set(CONNECTION, Values.KEEP_ALIVE); > ctx.writeAndFlush(response); > } > LOG.debug(response.toString()); > } > } > > > Exception stack trace. > > [nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - > DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: > UnpooledHeapByteBuf(freed)) > io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 > byte(s) of direct memory (used: 253426695, max: 253427712) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523) > at > io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477) > at > io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30) > at > io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67) > at > io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25) > at > io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425) > at > io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) > at > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) > at > io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) > at > io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:748) > > This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this > message: > > [nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: > ByteBuf.release() was not called before it's garbage-collected. See > http://netty.io/wiki/reference-counted-objects.html for more information. > WARNING: 3 leak records were discarded because the leak record count is > limited to 4. Use system property io.netty.leakDetection.maxRecords to > increase the limit. > Recent access records: 5 > #5: > > io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220) > io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815) > io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43) > > io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227) > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868) > io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574) > io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979) > io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974) > > com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) > > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) > > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > java.lang.Thread.run(Thread.java:748) > > > Created at: > > io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229) > > io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69) > > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) > > io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) > > io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) > > io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > java.lang.Thread.run(Thread.java:748) > > -- > You received this message because you are subscribed to the Google Groups > "Netty discussions" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To view this discussion on the web visit > https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com. > For more options, visit https://groups.google.com/d/optout.
-- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/1CFD0420-D5D4-4600-9969-B594D9BED03F%40googlemail.com. For more options, visit https://groups.google.com/d/optout.
