Repository: camel Updated Branches: refs/heads/camel-2.18.x e6256ae9d -> 3f346ea84
CAMEL-10409 Double release of netty buffer with thanks to Vitalii Conflicts: components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/27b572dc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/27b572dc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/27b572dc Branch: refs/heads/camel-2.18.x Commit: 27b572dc10f3cb61e9ae77f6f42f7fc29c0ea6d4 Parents: e6256ae Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Nov 15 10:04:41 2016 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Nov 15 10:56:19 2016 +0800 ---------------------------------------------------------------------- .../netty4/http/NettyHttpProducer.java | 14 ----- .../component/netty4/http/BaseNettyTest.java | 25 ++++++++ .../netty4/http/LogCaptureAppender.java | 65 ++++++++++++++++++++ .../component/netty4/http/LogCaptureTest.java | 35 +++++++++++ .../src/test/resources/log4j2.properties | 7 +++ .../camel/component/netty4/NettyProducer.java | 43 +++++++++++-- 6 files changed, 169 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java index 8a6b540..0be479a 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java @@ -74,20 +74,6 @@ public class NettyHttpProducer extends NettyProducer { // Need to remove the Host key as it should be not used when bridging/proxying exchange.getIn().removeHeader("host"); } - - // need to release the request when we are done - exchange.addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - if (request instanceof ReferenceCounted) { - if (((ReferenceCounted) request).refCnt() > 0) { - log.debug("Releasing Netty HttpRequest ByteBuf"); - ReferenceCountUtil.release(request); - } - } - } - }); - return request; } http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java index c511d89..75916e5 100644 --- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/BaseNettyTest.java @@ -18,14 +18,19 @@ package org.apache.camel.component.netty4.http; import java.io.File; import java.io.FileOutputStream; +import java.util.Collection; import java.util.Properties; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + import org.apache.camel.CamelContext; import org.apache.camel.component.properties.PropertiesComponent; import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.logging.log4j.core.LogEvent; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -65,6 +70,26 @@ public class BaseNettyTest extends CamelTestSupport { } } + @BeforeClass + public static void startLeakDetection() { + System.setProperty("io.netty.leakDetection.maxRecords", "100"); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + } + + @AfterClass + public static void verifyNoLeaks() throws Exception { + //Force GC to bring up leaks + System.gc(); + //Kick leak detection logging + ByteBufAllocator.DEFAULT.buffer(1).release(); + Collection<LogEvent> events = LogCaptureAppender.getEvents(); + if (!events.isEmpty()) { + String message = "Leaks detected while running tests: " + events; + LogCaptureAppender.reset(); + throw new AssertionError(message); + } + } + @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java new file mode 100644 index 0000000..ef836b6 --- /dev/null +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureAppender.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import java.io.Serializable; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; + +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; + +/** + */ +@Plugin(name = "LogCaptureAppender", category = "Core", elementType = "appender", printObject = true) +public class LogCaptureAppender extends AbstractAppender { + private static final Deque<LogEvent> LOG_EVENTS = new ArrayDeque<>(); + + public LogCaptureAppender(String name, Filter filter, Layout<? extends Serializable> layout) { + super(name, filter, layout); + } + + public LogCaptureAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions) { + super(name, filter, layout, ignoreExceptions); + } + + @PluginFactory + public static LogCaptureAppender createAppender(@PluginAttribute("name") final String name, + @PluginElement("Filter") final Filter filter) { + return new LogCaptureAppender(name, filter, null); + } + + @Override + public void append(LogEvent logEvent) { + LOG_EVENTS.add(logEvent); + } + + public static void reset() { + LOG_EVENTS.clear(); + } + + public static Collection<LogEvent> getEvents() { + return LOG_EVENTS; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java new file mode 100644 index 0000000..dfa38d2 --- /dev/null +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/LogCaptureTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import io.netty.util.ResourceLeakDetector; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import org.junit.Assert; +import org.junit.Test; + +/** + * This test ensures LogCaptureAppender is configured properly + */ +public class LogCaptureTest { + @Test + public void testCapture() { + InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError"); + Assert.assertFalse(LogCaptureAppender.getEvents().isEmpty()); + LogCaptureAppender.reset(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4-http/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/test/resources/log4j2.properties b/components/camel-netty4-http/src/test/resources/log4j2.properties index c5a234a..ce7b6d3 100644 --- a/components/camel-netty4-http/src/test/resources/log4j2.properties +++ b/components/camel-netty4-http/src/test/resources/log4j2.properties @@ -15,6 +15,7 @@ ## limitations under the License. ## --------------------------------------------------------------------------- +configuration.packages=org.apache.camel.component.netty4.http appender.file.type = File appender.file.name = file appender.file.fileName = target/camel-netty4-http-test.log @@ -24,5 +25,11 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.capture.type=LogCaptureAppender +appender.capture.name=capture + +logger.leak.name = io.netty.util.ResourceLeakDetector +logger.leak.appenderRef.capture.ref = capture + rootLogger.level = INFO rootLogger.appenderRef.file.ref = file http://git-wip-us.apache.org/repos/asf/camel/blob/27b572dc/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index 273220c..a375681 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -38,6 +38,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -193,11 +194,15 @@ public class NettyProducer extends DefaultAsyncProducer { callback.done(true); return true; } + return processWithBody(exchange, body, new BodyReleaseCallback(callback, body)); } catch (Exception e) { exchange.setException(e); callback.done(true); return true; } + } + + private boolean processWithBody(final Exchange exchange, Object body, BodyReleaseCallback callback) { // set the exchange encoding property if (getConfiguration().getCharsetName() != null) { @@ -240,7 +245,7 @@ public class NettyProducer extends DefaultAsyncProducer { return false; } - public void processWithConnectedChannel(final Exchange exchange, final AsyncCallback callback, final ChannelFuture channelFuture, final Object body) { + public void processWithConnectedChannel(final Exchange exchange, final BodyReleaseCallback callback, final ChannelFuture channelFuture, final Object body) { // remember channel so we can reuse it final Channel channel = channelFuture.channel(); if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) { @@ -283,15 +288,16 @@ public class NettyProducer extends DefaultAsyncProducer { channel.pipeline().replace(oldHandler, "timeout", newHandler); } } - + + //This will refer to original callback since netty will release body by itself final AsyncCallback producerCallback; if (configuration.isReuseChannel()) { // use callback as-is because we should not put it back in the pool as NettyProducerCallback would do // as when reuse channel is enabled it will put the channel back in the pool when exchange is done using on completion - producerCallback = callback; + producerCallback = callback.getOriginalCallback(); } else { - producerCallback = new NettyProducerCallback(channelFuture, callback); + producerCallback = new NettyProducerCallback(channelFuture, callback.getOriginalCallback()); } // setup state as attachment on the channel, so we can access the state later when needed @@ -618,10 +624,10 @@ public class NettyProducer extends DefaultAsyncProducer { */ private class ChannelConnectedListener implements ChannelFutureListener { private final Exchange exchange; - private final AsyncCallback callback; + private final BodyReleaseCallback callback; private final Object body; - ChannelConnectedListener(Exchange exchange, AsyncCallback callback, Object body) { + ChannelConnectedListener(Exchange exchange, BodyReleaseCallback callback, Object body) { this.exchange = exchange; this.callback = callback; this.body = body; @@ -636,6 +642,7 @@ public class NettyProducer extends DefaultAsyncProducer { } exchange.setException(cause); callback.done(false); + return; } try { @@ -646,4 +653,28 @@ public class NettyProducer extends DefaultAsyncProducer { } } } + + /** + * This class is used to release body in case when some error occured and body was not handed over + * to netty + */ + private static final class BodyReleaseCallback implements AsyncCallback { + private volatile Object body; + private final AsyncCallback originalCallback; + + private BodyReleaseCallback(AsyncCallback originalCallback, Object body) { + this.body = body; + this.originalCallback = originalCallback; + } + + public AsyncCallback getOriginalCallback() { + return originalCallback; + } + + @Override + public void done(boolean doneSync) { + ReferenceCountUtil.release(body); + originalCallback.done(doneSync); + } + } }