This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-15197v2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e8152d88f2981409a59d4d0f1bdf27fc2a5a0962
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Jun 29 10:43:06 2020 +0200

    CAMEL-15197: Thread pool can use vertx executor to reuse its threading 
model.
---
 apache-camel/src/main/descriptors/common-bin.xml   |   1 +
 bom/camel-bom/pom.xml                              |   5 +
 .../org/apache/camel/catalog/docs.properties       |   1 +
 .../docs/reactive-threadpoolfactory-vertx.adoc     |  64 ++++++++
 .../org/apache/camel/catalog/others.properties     |   1 +
 .../catalog/others/threadpoolfactory-vertx.json    |  15 ++
 .../src/main/docs/reactive-executor-vertx.adoc     |  23 +--
 .../reactive/vertx/VertXReactiveExecutor.java      |  54 ++++---
 ...ockTest.java => SimpleMockLookupVertxTest.java} |  12 +-
 .../org/apache/camel/reactive/SimpleMockTest.java  |  13 +-
 components/camel-threadpoolfactory-vertx/pom.xml   |  77 +++++++++
 .../services/org/apache/camel/other.properties     |   7 +
 .../services/org/apache/camel/reactive-executor    |   2 +
 .../services/org/apache/camel/thread-pool-factory  |   2 +
 .../resources/reactive-executor-vertx.json         |  15 ++
 .../resources/threadpoolfactory-vertx.json         |  15 ++
 .../docs/reactive-threadpoolfactory-vertx.adoc     |  64 ++++++++
 .../reactive/vertx/VertXThreadPoolFactory.java     | 179 +++++++++++++++++++++
 .../camel/reactive/SplitCustomThreadPoolTest.java  |  79 +++++++++
 .../reactive/SplitParallelLookupVertxTest.java}    |  46 +++---
 .../apache/camel/reactive/SplitParallelTest.java}  |  50 +++---
 .../src/test/resources/log4j2.properties           |  31 ++++
 components/pom.xml                                 |   1 +
 .../apache/camel/spi/SimpleExecutorService.java    |  11 ++
 .../org/apache/camel/spi/ThreadPoolFactory.java    |   5 +
 .../camel/impl/engine/AbstractCamelContext.java    |  15 +-
 .../impl/engine/BaseExecutorServiceManager.java    |  25 ++-
 .../DefaultAnnotationBasedProcessorFactory.java    |   2 +-
 .../camel/impl/engine/DefaultProducerTemplate.java |   3 +-
 .../camel/impl/engine/DefaultShutdownStrategy.java |   9 +-
 .../processor/aggregate/AggregateProcessor.java    |   2 +-
 .../camel/support/DefaultThreadPoolFactory.java    |  18 ++-
 docs/components/modules/others/nav.adoc            |   1 +
 .../pages/reactive-threadpoolfactory-vertx.adoc    |  66 ++++++++
 parent/pom.xml                                     |   5 +
 35 files changed, 813 insertions(+), 106 deletions(-)

diff --git a/apache-camel/src/main/descriptors/common-bin.xml 
b/apache-camel/src/main/descriptors/common-bin.xml
index 923e39e..b54a518 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -364,6 +364,7 @@
         <include>org.apache.camel:camel-testcontainers-junit5</include>
         <include>org.apache.camel:camel-testcontainers-spring</include>
         <include>org.apache.camel:camel-testcontainers-spring-junit5</include>
+        <include>org.apache.camel:camel-threadpoolfactory-vertx</include>
         <include>org.apache.camel:camel-thrift</include>
         <include>org.apache.camel:camel-tika</include>
         <include>org.apache.camel:camel-timer</include>
diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index 79bb915..b81eb55 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -1775,6 +1775,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-threadpoolfactory-vertx</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-thrift</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
index 1784dec..d876d4e 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
@@ -387,6 +387,7 @@ rabbitmq-component
 random-eip
 reactive-executor-vertx
 reactive-streams-component
+reactive-threadpoolfactory-vertx
 reactor
 recipientList-eip
 ref-component
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc
new file mode 100644
index 0000000..f04786d
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/reactive-threadpoolfactory-vertx.adoc
@@ -0,0 +1,64 @@
+[[threadpoolfactory-vertx-component]]
+= Thread Pool Factory Vert.x Component
+:docTitle: ThreadPoolFactory Vert.x
+:artifactId: camel-threadpoolfactory-vertx
+:description: ThreadPoolFactory for camel-core using Vert.x
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The camel-threadpoolfactory-vertx is a VertX based implementation of the 
`ThreadPoolFactory` SPI.
+
+By default Camel will use its own thread pool for EIPs that can use parallel 
processing (such as splitter, aggregator).
+You can plugin different engines via a SPI interface. This is a VertX based 
plugin that uses the VertX worker thread pool
+(executeBlocking).
+
+== Restrictions
+
+This implementation has been designed to use VertX worker threads for EIPs 
where concurrency has been enabled (using default settings).
+However this is limited to only apply when the EIP are not configured with a 
specific thread pool. For example the first example
+below will use VertX worker threads, and the 2nd below will not:
+
+[source,java]
+----
+from("direct:start")
+    .to("log:foo")
+    .split(body()).parallelProcessing()
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+The following Split EIP will refer to a custom thread pool, and therefore 
VertX is not in use, and Camel will
+use the custom thread pool:
+
+[source,java]
+----
+// register a custom thread pool profile with id myLowPool
+context.getExecutorServiceManager().registerThreadPoolProfile(
+    new 
ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build()
+);
+
+from("direct:start")
+    .to("log:foo")
+    .split(body()).executorServiceRef("myLowPool")
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+== VertX instance
+
+This implementation will by default create a default `io.vertx.core.Vertx` 
instance to be used.
+However you can configure an existing instance using the getter/setter on the 
`VertXThreadPoolFactory` class.
+
+== Auto detection from classpath
+
+To use this implementation all you need to do is to add the 
`camel-threadpoolfactory-vertx` dependency to the classpath,
+and Camel should auto-detect this on startup and log as follows:
+
+[source,text]
+----
+Using ThreadPoolFactory: camel-threadpoolfactory-vertx
+----
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
index c708226..73a6af4 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
@@ -33,5 +33,6 @@ testcontainers
 testcontainers-junit5
 testcontainers-spring
 testcontainers-spring-junit5
+threadpoolfactory-vertx
 undertow-spring-security
 zipkin
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json
new file mode 100644
index 0000000..037eafb
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/threadpoolfactory-vertx.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "threadpoolfactory-vertx",
+    "title": "ThreadPoolFactory Vert.x",
+    "description": "ThreadPoolFactory for camel-core using Vert.x",
+    "deprecated": false,
+    "firstVersion": "3.5.0",
+    "label": "reactive",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-threadpoolfactory-vertx",
+    "version": "3.5.0-SNAPSHOT"
+  }
+}
diff --git 
a/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
 
b/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
index d76c301..dbf107a 100644
--- 
a/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
+++ 
b/components/camel-reactive-executor-vertx/src/main/docs/reactive-executor-vertx.adoc
@@ -21,7 +21,7 @@ At this time this component is an experiment so use it with 
care.
 
 == VertX instance
 
-This implementation will by default create a default `io.vertx.core.Vertx` 
instance to be used.
+This implementation will first lookup in the registry for an existing 
`io.vertx.core.Vertx` to be used.
 However you can configure an existing instance using the getter/setter on the 
`VertXReactiveExecutor` class.
 
 == Auto detection from classpath
@@ -31,25 +31,6 @@ and Camel should auto-detect this on startup and log as 
follows:
 
 [source,text]
 ----
-Using ReactiveExecutor: 
org.apache.camel.reactive.vertx.VertXReactiveExecutor@2a62b5bc
+Using ReactiveExecutor: camel-reactive-executor-vertx
 ----
 
-== Manual enabling
-
-If you use OSGi or the implementation is not added to the classpath, you need 
to enable this explict such:
-
-[source,java]
-----
-CamelContext camel = ...
-
-camel.setReactiveExecutor(new VertXReactiveExecutor());
-----
-
-Or in XML DSL (spring or blueprint XML file) you can declare the factory as a 
`<bean>`:
-
-[source,xml]
-----
-<bean id="vertxReactiveExecutor" 
class="org.apache.camel.reactive.vertx.VertXReactiveExecutor"/>
-----
-
-and then Camel should detect the bean and use the reactive executor.
diff --git 
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
 
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 9fe72ec..0ada813 100644
--- 
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ 
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.reactive.vertx;
 
+import java.util.Set;
+
 import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Experimental;
 import org.apache.camel.StaticService;
 import org.apache.camel.spi.ReactiveExecutor;
@@ -32,12 +36,22 @@ import org.slf4j.LoggerFactory;
  */
 @Experimental
 @JdkService(ReactiveExecutor.FACTORY)
-public class VertXReactiveExecutor extends ServiceSupport implements 
ReactiveExecutor, StaticService {
+public class VertXReactiveExecutor extends ServiceSupport implements 
CamelContextAware, ReactiveExecutor, StaticService {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(VertXReactiveExecutor.class);
 
+    private CamelContext camelContext;
     private Vertx vertx;
-    private boolean shouldClose;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 
     public Vertx getVertx() {
         return vertx;
@@ -51,6 +65,25 @@ public class VertXReactiveExecutor extends ServiceSupport 
implements ReactiveExe
     }
 
     @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        if (vertx == null) {
+            Set<Vertx> set = 
getCamelContext().getRegistry().findByType(Vertx.class);
+            if (set.size() == 1) {
+                vertx = set.iterator().next();
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (vertx == null) {
+            throw new IllegalArgumentException("VertX instance must be 
configured.");
+        }
+    }
+
+    @Override
     public void schedule(Runnable runnable) {
         LOG.trace("schedule: {}", runnable);
         vertx.nettyEventLoopGroup().execute(runnable);
@@ -79,23 +112,6 @@ public class VertXReactiveExecutor extends ServiceSupport 
implements ReactiveExe
     }
 
     @Override
-    protected void doStart() throws Exception {
-        if (vertx == null) {
-            LOG.debug("Starting VertX");
-            shouldClose = true;
-            vertx = Vertx.vertx();
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (vertx != null && shouldClose) {
-            LOG.debug("Stopping VertX");
-            vertx.close();
-        }
-    }
-
-    @Override
     public String toString() {
         return "camel-reactive-executor-vertx";
     }
diff --git 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
 
b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java
similarity index 85%
copy from 
components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
copy to 
components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java
index 5e1c01b..79683fc 100644
--- 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ 
b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockLookupVertxTest.java
@@ -16,21 +16,21 @@
  */
 package org.apache.camel.reactive;
 
+import io.vertx.core.Vertx;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.engine.AbstractCamelContext;
-import org.apache.camel.reactive.vertx.VertXReactiveExecutor;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class SimpleMockTest extends CamelTestSupport {
+public class SimpleMockLookupVertxTest extends CamelTestSupport {
+
+    private final Vertx vertx = Vertx.vertx();
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        AbstractCamelContext context = new DefaultCamelContext();
-        context.setReactiveExecutor(new VertXReactiveExecutor());
+        CamelContext context = super.createCamelContext();
+        context.getRegistry().bind("vertx", vertx);
         return context;
     }
 
diff --git 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
 
b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
index 5e1c01b..2a39514 100644
--- 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ 
b/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
@@ -16,21 +16,26 @@
  */
 package org.apache.camel.reactive;
 
+import io.vertx.core.Vertx;
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.engine.AbstractCamelContext;
 import org.apache.camel.reactive.vertx.VertXReactiveExecutor;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 public class SimpleMockTest extends CamelTestSupport {
 
+    private final Vertx vertx = Vertx.vertx();
+
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        AbstractCamelContext context = new DefaultCamelContext();
-        context.setReactiveExecutor(new VertXReactiveExecutor());
+        CamelContext context = super.createCamelContext();
+
+        VertXReactiveExecutor re = (VertXReactiveExecutor) 
context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
+        re.setVertx(vertx);
+
         return context;
     }
 
diff --git a/components/camel-threadpoolfactory-vertx/pom.xml 
b/components/camel-threadpoolfactory-vertx/pom.xml
new file mode 100644
index 0000000..45fde68
--- /dev/null
+++ b/components/camel-threadpoolfactory-vertx/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-threadpoolfactory-vertx</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: Thread Pool Factory :: Vert.x</name>
+    <description>ThreadPoolFactory for camel-core using Vert.x</description>
+
+    <properties>
+        <firstVersion>3.5.0</firstVersion>
+        <label>reactive</label>
+        <title>ThreadPoolFactory Vert.x</title>
+        <supportLevel>preview</supportLevel>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-core</artifactId>
+            <version>${vertx-version}</version>
+        </dependency>
+
+        <!-- testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
new file mode 100644
index 0000000..c9aede5
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+name=threadpoolfactory-vertx
+groupId=org.apache.camel
+artifactId=camel-threadpoolfactory-vertx
+version=3.5.0-SNAPSHOT
+projectName=Camel :: Thread Pool Factory :: Vert.x
+projectDescription=ThreadPoolFactory for camel-core using Vert.x
diff --git 
a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor
 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor
new file mode 100644
index 0000000..1e1e324
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/reactive-executor
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.reactive.vertx.VertXReactiveExecutor
diff --git 
a/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory
 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory
new file mode 100644
index 0000000..b938ef6
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.reactive.vertx.VertXThreadPoolFactory
diff --git 
a/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json
 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json
new file mode 100644
index 0000000..217160b
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/reactive-executor-vertx.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "reactive-executor-vertx",
+    "title": "Reactive Executor Vert.x",
+    "description": "Reactive Executor for camel-core using Vert.x",
+    "deprecated": false,
+    "firstVersion": "3.0.0",
+    "label": "reactive",
+    "supportLevel": "Experimental",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-reactive-executor-vertx",
+    "version": "3.5.0-SNAPSHOT"
+  }
+}
diff --git 
a/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json
 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json
new file mode 100644
index 0000000..037eafb
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/generated/resources/threadpoolfactory-vertx.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "threadpoolfactory-vertx",
+    "title": "ThreadPoolFactory Vert.x",
+    "description": "ThreadPoolFactory for camel-core using Vert.x",
+    "deprecated": false,
+    "firstVersion": "3.5.0",
+    "label": "reactive",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-threadpoolfactory-vertx",
+    "version": "3.5.0-SNAPSHOT"
+  }
+}
diff --git 
a/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc
 
b/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc
new file mode 100644
index 0000000..9a487b1
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc
@@ -0,0 +1,64 @@
+[[threadpoolfactory-vertx-component]]
+= Thread Pool Factory Vert.x Component
+:docTitle: ThreadPoolFactory Vert.x
+:artifactId: camel-threadpoolfactory-vertx
+:description: ThreadPoolFactory for camel-core using Vert.x
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The camel-threadpoolfactory-vertx is a VertX based implementation of the 
`ThreadPoolFactory` SPI.
+
+By default Camel will use its own thread pool for EIPs that can use parallel 
processing (such as splitter, aggregator).
+You can plugin different engines via a SPI interface. This is a VertX based 
plugin that uses the VertX worker thread pool
+(executeBlocking).
+
+== Restrictions
+
+This implementation has been designed to use VertX worker threads for EIPs 
where concurrency has been enabled (using default settings).
+However this is limited to only apply when the EIP are not configured with a 
specific thread pool. For example the first example
+below will use VertX worker threads, and the 2nd below will not:
+
+[source,java]
+----
+from("direct:start")
+    .to("log:foo")
+    .split(body()).parallelProcessing()
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+The following Split EIP will refer to a custom thread pool, and therefore 
VertX is not in use, and Camel will
+use the custom thread pool:
+
+[source,java]
+----
+// register a custom thread pool profile with id myLowPool
+context.getExecutorServiceManager().registerThreadPoolProfile(
+    new 
ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build()
+);
+
+from("direct:start")
+    .to("log:foo")
+    .split(body()).executorServiceRef("myLowPool")
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+== VertX instance
+
+This implementation will first lookup in the registry for an existing 
`io.vertx.core.Vertx` to be used.
+However you can configure an existing instance using the getter/setter on the 
`VertXThreadPoolFactory` class.
+
+== Auto detection from classpath
+
+To use this implementation all you need to do is to add the 
`camel-threadpoolfactory-vertx` dependency to the classpath,
+and Camel should auto-detect this on startup and log as follows:
+
+[source,text]
+----
+Using ThreadPoolFactory: camel-threadpoolfactory-vertx
+----
diff --git 
a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
 
b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
new file mode 100644
index 0000000..5c72b86
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.reactive.vertx;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.spi.SimpleExecutorService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.spi.annotations.JdkService;
+import org.apache.camel.support.DefaultThreadPoolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@JdkService(ThreadPoolFactory.FACTORY)
+public class VertXThreadPoolFactory extends DefaultThreadPoolFactory 
implements ThreadPoolFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(VertXThreadPoolFactory.class);
+
+    private final ExecutorService vertxExecutorService = new 
VertXExecutorService();
+    private Vertx vertx;
+
+    public Vertx getVertx() {
+        return vertx;
+    }
+
+    /**
+     * To use an existing instance of {@link Vertx} instead of creating a 
default instance.
+     */
+    public void setVertx(Vertx vertx) {
+        this.vertx = vertx;
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        if (vertx == null) {
+            Set<Vertx> set = 
getCamelContext().getRegistry().findByType(Vertx.class);
+            if (set.size() == 1) {
+                vertx = set.iterator().next();
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (vertx == null) {
+            throw new IllegalArgumentException("VertX instance must be 
configured.");
+        }
+    }
+
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, 
ThreadFactory threadFactory) {
+        // only do this for default profile (which mean its not a custom pool)
+        if (profile.isDefaultProfile()) {
+            return vertxExecutorService;
+        } else {
+            return super.newThreadPool(profile, threadFactory);
+        }
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return vertxExecutorService;
+    }
+
+    @Override
+    public String toString() {
+        return "camel-threadpoolfactory-vertx";
+    }
+
+    private class VertXExecutorService implements ExecutorService, 
SimpleExecutorService {
+
+        @Override
+        public void shutdown() {
+            // noop
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            // noop
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            return false;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            LOG.trace("submit: {}", task);
+            final CompletableFuture<?> answer = new CompletableFuture<>();
+            // used by vertx
+            vertx.executeBlocking(future -> {
+                task.run();
+                future.complete();
+            }, res -> answer.complete(null));
+            return answer;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
TimeoutException {
+            return null;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            LOG.trace("execute: {}", command);
+            // used by vertx
+            vertx.executeBlocking(future -> {
+                command.run();
+                future.complete();
+            }, null);
+        }
+    }
+
+}
diff --git 
a/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java
 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java
new file mode 100644
index 0000000..2888df0
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitCustomThreadPoolTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.reactive;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.reactive.vertx.VertXThreadPoolFactory;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SplitCustomThreadPoolTest extends CamelTestSupport {
+
+    private final Vertx vertx = Vertx.vertx();
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        VertXThreadPoolFactory tpf = (VertXThreadPoolFactory) 
context.getExecutorServiceManager().getThreadPoolFactory();
+        tpf.setVertx(vertx);
+
+        return context;
+    }
+
+    @Test
+    public void testSplit() throws Exception {
+        
getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J");
+        getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", 
"B", "C", "D", "E", "F", "G", "H", "I", "J");
+
+        template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J");
+
+        assertMockEndpointsSatisfied();
+
+        vertx.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // register a custom thread pool profile with id myLowPool
+                context.getExecutorServiceManager().registerThreadPoolProfile(
+                    new 
ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build()
+                );
+
+                from("direct:start")
+                    .to("log:foo")
+                    .split(body()).executorServiceRef("myLowPool")
+                        .to("log:bar")
+                        .process(e -> {
+                            String name = Thread.currentThread().getName();
+                            Assert.assertTrue("Should use Camel thread", 
name.startsWith("Camel"));
+                        })
+                        .to("mock:split")
+                    .end()
+                    .to("log:result")
+                    .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java
similarity index 54%
copy from 
components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
copy to 
components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java
index 5e1c01b..39ec41f 100644
--- 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelLookupVertxTest.java
@@ -16,43 +16,34 @@
  */
 package org.apache.camel.reactive;
 
+import io.vertx.core.Vertx;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.engine.AbstractCamelContext;
-import org.apache.camel.reactive.vertx.VertXReactiveExecutor;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
 import org.junit.Test;
 
-public class SimpleMockTest extends CamelTestSupport {
+public class SplitParallelLookupVertxTest extends CamelTestSupport {
+
+    private final Vertx vertx = Vertx.vertx();
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        AbstractCamelContext context = new DefaultCamelContext();
-        context.setReactiveExecutor(new VertXReactiveExecutor());
+        CamelContext context = super.createCamelContext();
+        context.getRegistry().bind("vertx", vertx);
         return context;
     }
 
     @Test
-    public void testSimple() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
+    public void testSplit() throws Exception {
+        
getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J");
+        getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", 
"B", "C", "D", "E", "F", "G", "H", "I", "J");
 
-        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J");
 
         assertMockEndpointsSatisfied();
-    }
-
-    @Test
-    public void testSimpleTwoMessages() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World", "Bye World");
-
-        template.sendBody("direct:start", "Hello World");
-        template.sendBody("direct:start", "Bye World");
 
-        assertMockEndpointsSatisfied();
+        vertx.close();
     }
 
     @Override
@@ -60,7 +51,18 @@ public class SimpleMockTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("direct:start").to("log:foo").to("log:bar").to("mock:result");
+                from("direct:start")
+                    .to("log:foo")
+                    .split(body()).parallelProcessing()
+                        .to("log:bar")
+                        .process(e -> {
+                            String name = Thread.currentThread().getName();
+                            Assert.assertTrue("Should use vertx thread", 
name.startsWith("vert.x-worker-thread"));
+                        })
+                        .to("mock:split")
+                    .end()
+                    .to("log:result")
+                    .to("mock:result");
             }
         };
     }
diff --git 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java
similarity index 51%
copy from 
components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
copy to 
components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java
index 5e1c01b..30dacdf 100644
--- 
a/components/camel-reactive-executor-vertx/src/test/java/org/apache/camel/reactive/SimpleMockTest.java
+++ 
b/components/camel-threadpoolfactory-vertx/src/test/java/org/apache/camel/reactive/SplitParallelTest.java
@@ -16,43 +16,38 @@
  */
 package org.apache.camel.reactive;
 
+import io.vertx.core.Vertx;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.engine.AbstractCamelContext;
-import org.apache.camel.reactive.vertx.VertXReactiveExecutor;
+import org.apache.camel.reactive.vertx.VertXThreadPoolFactory;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
 import org.junit.Test;
 
-public class SimpleMockTest extends CamelTestSupport {
+public class SplitParallelTest extends CamelTestSupport {
+
+    private final Vertx vertx = Vertx.vertx();
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        AbstractCamelContext context = new DefaultCamelContext();
-        context.setReactiveExecutor(new VertXReactiveExecutor());
-        return context;
-    }
-
-    @Test
-    public void testSimple() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
+        CamelContext context = super.createCamelContext();
 
-        template.sendBody("direct:start", "Hello World");
+        VertXThreadPoolFactory tpf = (VertXThreadPoolFactory) 
context.getExecutorServiceManager().getThreadPoolFactory();
+        tpf.setVertx(vertx);
 
-        assertMockEndpointsSatisfied();
+        return context;
     }
 
     @Test
-    public void testSimpleTwoMessages() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World", "Bye World");
+    public void testSplit() throws Exception {
+        
getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E,F,G,H,I,J");
+        getMockEndpoint("mock:split").expectedBodiesReceivedInAnyOrder("A", 
"B", "C", "D", "E", "F", "G", "H", "I", "J");
 
-        template.sendBody("direct:start", "Hello World");
-        template.sendBody("direct:start", "Bye World");
+        template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J");
 
         assertMockEndpointsSatisfied();
+
+        vertx.close();
     }
 
     @Override
@@ -60,7 +55,18 @@ public class SimpleMockTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("direct:start").to("log:foo").to("log:bar").to("mock:result");
+                from("direct:start")
+                    .to("log:foo")
+                    .split(body()).parallelProcessing()
+                        .to("log:bar")
+                        .process(e -> {
+                            String name = Thread.currentThread().getName();
+                            Assert.assertTrue("Should use vertx thread", 
name.startsWith("vert.x-worker-thread"));
+                        })
+                        .to("mock:split")
+                    .end()
+                    .to("log:result")
+                    .to("mock:result");
             }
         };
     }
diff --git 
a/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties 
b/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..b06ea95
--- /dev/null
+++ 
b/components/camel-threadpoolfactory-vertx/src/test/resources/log4j2.properties
@@ -0,0 +1,31 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.out.type = File
+appender.out.name = out
+appender.out.fileName = target/camel-threadpoolfactory-vertx.log
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.stdout.type = Console
+appender.stdout.name = stdout
+appender.stdout.layout.type = PatternLayout
+appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+rootLogger.level = INFO
+
+rootLogger.appenderRef.out.ref = out
+#rootLogger.appenderRef.out.ref = stdout
diff --git a/components/pom.xml b/components/pom.xml
index bb3aeaf..26fd88b 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -354,6 +354,7 @@
         <module>camel-tagsoup</module>
         <module>camel-tarfile</module>
         <module>camel-telegram</module>
+        <module>camel-threadpoolfactory-vertx</module>
         <module>camel-thrift</module>
         <module>camel-tika</module>
         <module>camel-twilio</module>
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java
new file mode 100644
index 0000000..e3ff3eb
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/SimpleExecutorService.java
@@ -0,0 +1,11 @@
+package org.apache.camel.spi;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Marker interface to signal that a {@link ExecutorService} is simple and 
tasks are either
+ * only submitted via {@link ExecutorService#submit(Runnable)} or executed
+ * via {@link ExecutorService#execute(Runnable)} methods.
+ */
+public interface SimpleExecutorService {
+}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
index 45492bc..a25a539 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ThreadFactory;
 public interface ThreadPoolFactory {
 
     /**
+     * Service factory key.
+     */
+    String FACTORY = "thread-pool-factory";
+
+    /**
      * Creates a new cached thread pool
      * <p/>
      * The cached thread pool is a term from the JDK from the method {@link 
java.util.concurrent.Executors#newCachedThreadPool()}.
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d1a0012..9a650cf 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2794,6 +2794,13 @@ public abstract class AbstractCamelContext extends 
BaseService
             LOG.debug("Using ReactiveExecutor: {}", getReactiveExecutor());
         }
 
+        // lets log at INFO level if we are not using the default thread pool 
factory
+        if 
(!getExecutorServiceManager().getThreadPoolFactory().getClass().getSimpleName().equals("DefaultThreadPoolFactory"))
 {
+            LOG.info("Using ThreadPoolFactory: {}", 
getExecutorServiceManager().getThreadPoolFactory());
+        } else {
+            LOG.debug("Using ThreadPoolFactory: {}", 
getExecutorServiceManager().getThreadPoolFactory());
+        }
+
         HealthCheckRegistry hcr = getExtension(HealthCheckRegistry.class);
         if (hcr != null && hcr.isEnabled()) {
             LOG.info("Using HealthCheck: {}", hcr.getId());
@@ -2926,15 +2933,17 @@ public abstract class AbstractCamelContext extends 
BaseService
             }
         }
 
-        // shutdown executor service, reactive executor and management as the 
last one
-        shutdownServices(executorServiceManager);
-        shutdownServices(reactiveExecutor);
+        // shutdown management and lifecycle after all other services
         shutdownServices(managementStrategy);
         shutdownServices(managementMBeanAssembler);
         shutdownServices(lifecycleStrategies);
         // do not clear lifecycleStrategies as we can start Camel again and get
         // the route back as before
 
+        // shutdown executor service, reactive executor last
+        shutdownServices(executorServiceManager);
+        shutdownServices(reactiveExecutor);
+
         // shutdown type converter as late as possible
         ServiceHelper.stopService(typeConverter);
         ServiceHelper.stopService(typeConverterRegistry);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index 371c40b..e8d6404 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -30,14 +30,17 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.NamedNode;
 import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.LifecycleStrategy;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.DefaultThreadPoolFactory;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -58,7 +61,7 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
     private static final Logger LOG = 
LoggerFactory.getLogger(BaseExecutorServiceManager.class);
 
     private final CamelContext camelContext;
-    private ThreadPoolFactory threadPoolFactory = new 
DefaultThreadPoolFactory();
+    private ThreadPoolFactory threadPoolFactory;
     private final List<ExecutorService> executorServices = new 
CopyOnWriteArrayList<>();
     private String threadNamePattern;
     private long shutdownAwaitTermination = 10000;
@@ -427,10 +430,28 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
     @Override
     protected void doInit() throws Exception {
         super.doInit();
+
         if (threadNamePattern == null) {
             // set default name pattern which includes the camel context name
             threadNamePattern = "Camel (" + camelContext.getName() + ") thread 
##counter# - #name#";
         }
+
+        // discover thread pool factory
+        if (threadPoolFactory == null) {
+            threadPoolFactory = new 
BaseServiceResolver<>(ThreadPoolFactory.FACTORY, ThreadPoolFactory.class)
+                    .resolve(camelContext)
+                    .orElseGet(DefaultThreadPoolFactory::new);
+        }
+        if (threadPoolFactory instanceof CamelContextAware) {
+            ((CamelContextAware) 
threadPoolFactory).setCamelContext(camelContext);
+        }
+        ServiceHelper.initService(threadPoolFactory);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        ServiceHelper.startService(threadPoolFactory);
     }
 
     @Override
@@ -478,6 +499,8 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
                 it.remove();
             }
         }
+
+        ServiceHelper.stopAndShutdownServices(threadPoolFactory);
     }
 
     /**
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
index 6aeb5e5..468b9cf 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
@@ -54,7 +54,7 @@ public final class DefaultAnnotationBasedProcessorFactory 
implements AnnotationB
         recipientList.setShareUnitOfWork(annotation.shareUnitOfWork());
 
         if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
-            ExecutorService executor = 
camelContext.getExecutorServiceManager().newDefaultThreadPool(this, 
annotation.executorServiceRef());
+            ExecutorService executor = 
camelContext.getExecutorServiceManager().newThreadPool(this, "@RecipientList", 
annotation.executorServiceRef());
             recipientList.setExecutorService(executor);
         }
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
index ec30247..8b9f6c1 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
@@ -782,8 +782,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
             synchronized (lock) {
                 if (executor == null) {
                     if (threadedAsyncMode) {
-                        executor = 
ObjectHelper.notNull(camelContext.getExecutorServiceManager().newDefaultThreadPool(this,
 "ProducerTemplate"),
-                                "ExecutorService");
+                        executor = 
camelContext.getExecutorServiceManager().newDefaultThreadPool(this, 
"ProducerTemplate");
                     } else {
                         executor = new SynchronousExecutorService();
                     }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
index 4210f28..25597e8 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -199,10 +200,12 @@ public class DefaultShutdownStrategy extends 
ServiceSupport implements ShutdownS
 
         // use another thread to perform the shutdowns so we can support 
timeout
         timeoutOccurred.set(false);
-        currentShutdownTaskFuture = getExecutorService().submit(new 
ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly,
-            abortAfterTimeout, timeoutOccurred, 
isLogInflightExchangesOnTimeout()));
         try {
+            currentShutdownTaskFuture = getExecutorService().submit(new 
ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly,
+                    abortAfterTimeout, timeoutOccurred, 
isLogInflightExchangesOnTimeout()));
             currentShutdownTaskFuture.get(timeout, timeUnit);
+        } catch (RejectedExecutionException e) {
+            // the task was rejected
         } catch (ExecutionException e) {
             // unwrap execution exception
             throw 
RuntimeCamelException.wrapRuntimeCamelException(e.getCause());
@@ -430,7 +433,7 @@ public class DefaultShutdownStrategy extends ServiceSupport 
implements ShutdownS
     private ExecutorService getExecutorService() {
         if (executor == null) {
             // use a thread pool that allow to terminate idle threads so they 
do not hang around forever
-            executor = 
camelContext.getExecutorServiceManager().newThreadPool(this, "ShutdownTask", 0, 
1);
+            executor = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
"ShutdownTask");
         }
         return executor;
     }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index aada58c..e14b243 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1541,7 +1541,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport implements Navigat
         if (optimisticLocking) {
             lock = NoLock.INSTANCE;
             if (getOptimisticLockingExecutorService() == null) {
-                
setOptimisticLockingExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
 AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR, 1));
+                
setOptimisticLockingExecutorService(camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR));
                 shutdownOptimisticLockingExecutorService = true;
             }
         } else {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
index 4bb7f23..5e1246b 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -28,8 +28,12 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
 import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
 import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
@@ -37,7 +41,19 @@ import 
org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 /**
  * Factory for thread pools that uses the JDK {@link Executors} for creating 
the thread pools.
  */
-public class DefaultThreadPoolFactory implements ThreadPoolFactory {
+public class DefaultThreadPoolFactory extends ServiceSupport implements 
CamelContextAware, ThreadPoolFactory, StaticService {
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 
     @Override
     public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
diff --git a/docs/components/modules/others/nav.adoc 
b/docs/components/modules/others/nav.adoc
index 7410a5f..f0914d2 100644
--- a/docs/components/modules/others/nav.adoc
+++ b/docs/components/modules/others/nav.adoc
@@ -38,5 +38,6 @@
 ** xref:testcontainers-junit5.adoc[Testcontainers JUnit5]
 ** xref:testcontainers-spring.adoc[Testcontainers Spring]
 ** xref:testcontainers-spring-junit5.adoc[Testcontainers Spring Junit5]
+** xref:reactive-threadpoolfactory-vertx.adoc[ThreadPoolFactory Vert.x]
 ** xref:undertow-spring-security.adoc[Undertow Spring Security]
 ** xref:zipkin.adoc[Zipkin]
diff --git 
a/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc 
b/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc
new file mode 100644
index 0000000..cf33fda
--- /dev/null
+++ b/docs/components/modules/others/pages/reactive-threadpoolfactory-vertx.adoc
@@ -0,0 +1,66 @@
+[[threadpoolfactory-vertx-component]]
+= Thread Pool Factory Vert.x Component
+//THIS FILE IS COPIED: EDIT THE SOURCE FILE:
+:page-source: 
components/camel-threadpoolfactory-vertx/src/main/docs/reactive-threadpoolfactory-vertx.adoc
+:docTitle: ThreadPoolFactory Vert.x
+:artifactId: camel-threadpoolfactory-vertx
+:description: ThreadPoolFactory for camel-core using Vert.x
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The camel-threadpoolfactory-vertx is a VertX based implementation of the 
`ThreadPoolFactory` SPI.
+
+By default Camel will use its own thread pool for EIPs that can use parallel 
processing (such as splitter, aggregator).
+You can plugin different engines via a SPI interface. This is a VertX based 
plugin that uses the VertX worker thread pool
+(executeBlocking).
+
+== Restrictions
+
+This implementation has been designed to use VertX worker threads for EIPs 
where concurrency has been enabled (using default settings).
+However this is limited to only apply when the EIP are not configured with a 
specific thread pool. For example the first example
+below will use VertX worker threads, and the 2nd below will not:
+
+[source,java]
+----
+from("direct:start")
+    .to("log:foo")
+    .split(body()).parallelProcessing()
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+The following Split EIP will refer to a custom thread pool, and therefore 
VertX is not in use, and Camel will
+use the custom thread pool:
+
+[source,java]
+----
+// register a custom thread pool profile with id myLowPool
+context.getExecutorServiceManager().registerThreadPoolProfile(
+    new 
ThreadPoolProfileBuilder("myLowPool").poolSize(2).maxPoolSize(10).build()
+);
+
+from("direct:start")
+    .to("log:foo")
+    .split(body()).executorServiceRef("myLowPool")
+        .to("mock:split")
+    .end()
+    .to("mock:result");
+----
+
+== VertX instance
+
+This implementation will by default create a default `io.vertx.core.Vertx` 
instance to be used.
+However you can configure an existing instance using the getter/setter on the 
`VertXThreadPoolFactory` class.
+
+== Auto detection from classpath
+
+To use this implementation all you need to do is to add the 
`camel-threadpoolfactory-vertx` dependency to the classpath,
+and Camel should auto-detect this on startup and log as follows:
+
+[source,text]
+----
+Using ThreadPoolFactory: camel-threadpoolfactory-vertx
+----
diff --git a/parent/pom.xml b/parent/pom.xml
index c2f50c3..c62842f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -2364,6 +2364,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-threadpoolfactory-vertx</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-thrift</artifactId>
         <version>${project.version}</version>
       </dependency>

Reply via email to