This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.24.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 77c0f4f92084ee31ab55c148389dba8222be105f Author: Luigi De Masi <ldem...@redhat.com> AuthorDate: Mon Nov 4 23:30:23 2019 +0100 CAMEL-14137 Thread leak in camel-jetty component if maxThreads or minThreads property is set --- .../camel/component/jetty/JettyHttpComponent.java | 29 +++++++--- .../component/jetty/JettyThreadPoolSizeTest.java | 67 ++++++++++++++++++++++ 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java index bad5862..4e738f2 100644 --- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java +++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java @@ -144,6 +144,7 @@ public abstract class JettyHttpComponent extends HttpCommonComponent implements protected boolean useXForwardedForHeader; private Integer proxyPort; private boolean sendServerVersion = true; + private QueuedThreadPool defaultQueuedThreadPool; public JettyHttpComponent() { super(JettyHttpEndpoint.class); @@ -521,6 +522,15 @@ public abstract class JettyHttpComponent extends HttpCommonComponent implements this.removeServerMBean(connectorRef.server); //mbContainer.removeBean(connectorRef.connector); } + if (defaultQueuedThreadPool != null) { + try { + defaultQueuedThreadPool.stop(); + } catch (Throwable t) { + defaultQueuedThreadPool.destroy(); + } finally { + defaultQueuedThreadPool = null; + } + } } } } @@ -1399,20 +1409,21 @@ public abstract class JettyHttpComponent extends HttpCommonComponent implements protected Server createServer() { Server s = null; ThreadPool tp = threadPool; - QueuedThreadPool qtp = null; + defaultQueuedThreadPool = null; // configure thread pool if min/max given if (minThreads != null || maxThreads != null) { if (getThreadPool() != null) { throw new IllegalArgumentException("You cannot configure both minThreads/maxThreads and a custom threadPool on JettyHttpComponent: " + this); } - qtp = new QueuedThreadPool(); + defaultQueuedThreadPool = new QueuedThreadPool(); if (minThreads != null) { - qtp.setMinThreads(minThreads.intValue()); + defaultQueuedThreadPool.setMinThreads(minThreads.intValue()); } if (maxThreads != null) { - qtp.setMaxThreads(maxThreads.intValue()); + defaultQueuedThreadPool.setMaxThreads(maxThreads.intValue()); } - tp = qtp; + tp = defaultQueuedThreadPool; + } if (tp != null) { try { @@ -1433,13 +1444,13 @@ public abstract class JettyHttpComponent extends HttpCommonComponent implements if (s == null) { s = new Server(); } - if (qtp != null) { + if (defaultQueuedThreadPool != null) { // let the thread names indicate they are from the server - qtp.setName("CamelJettyServer(" + ObjectHelper.getIdentityHashCode(s) + ")"); + defaultQueuedThreadPool.setName("CamelJettyServer(" + ObjectHelper.getIdentityHashCode(s) + ")"); try { - qtp.start(); + defaultQueuedThreadPool.start(); } catch (Exception e) { - throw new RuntimeCamelException("Error starting JettyServer thread pool: " + qtp, e); + throw new RuntimeCamelException("Error starting JettyServer thread pool: " + defaultQueuedThreadPool, e); } } ContextHandlerCollection collection = new ContextHandlerCollection(); diff --git a/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/JettyThreadPoolSizeTest.java b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/JettyThreadPoolSizeTest.java new file mode 100644 index 0000000..0f6fc62 --- /dev/null +++ b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/JettyThreadPoolSizeTest.java @@ -0,0 +1,67 @@ +package org.apache.camel.component.jetty; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class JettyThreadPoolSizeTest extends BaseJettyTest { + + + private static final Logger LOG = LoggerFactory.getLogger(JettyThreadPoolSizeTest.class); + + @Test + public void threadPoolTest() throws Exception { + + long initialJettyThreadNumber = countJettyThread(); + + LOG.info("initial Jetty thread number (expected 5): " + initialJettyThreadNumber); + + context.stop(); + + long jettyThreadNumberAfterStop = countJettyThread(); + + LOG.info("Jetty thread number after stopping Camel Context: (expected 0): " + jettyThreadNumberAfterStop); + + JettyHttpComponent jettyComponent = (JettyHttpComponent)context.getComponent("jetty"); + jettyComponent.setMinThreads(5); + jettyComponent.setMaxThreads(5); + + context.start(); + + long jettyThreadNumberAfterRestart = countJettyThread(); + + LOG.info("Jetty thread number after starting Camel Context: (expected 5): "+ jettyThreadNumberAfterRestart); + + assertEquals(5L, initialJettyThreadNumber); + + assertEquals(0L, jettyThreadNumberAfterStop); + + assertEquals(5L, jettyThreadNumberAfterRestart); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // setup the jetty component with the custom minThreads + JettyHttpComponent jettyComponent = (JettyHttpComponent)context.getComponent("jetty"); + jettyComponent.setMinThreads(5); + jettyComponent.setMaxThreads(5); + + from("jetty://http://localhost:{{port}}/myserverWithCustomPoolSize").to("mock:result"); + } + }; + } + + private long countJettyThread() { + + Set<Thread> threadSet = Thread.getAllStackTraces().keySet(); + return threadSet.stream().filter(thread -> thread.getName().contains("CamelJettyServer")).count(); + } + +}