This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new 7432698  CAMEL-14107 Backporting to 2.x changestream feature to 
mongodb component (#3288)
7432698 is described below

commit 7432698322d38057a9c84e08dd7782c846eeef00
Author: Pasquale Congiusti <pasquale.congiu...@gmail.com>
AuthorDate: Mon Oct 28 17:08:48 2019 +0100

    CAMEL-14107 Backporting to 2.x changestream feature to mongodb component 
(#3288)
---
 .../src/main/docs/mongodb3-component.adoc          |  44 ++++-
 .../mongodb3/MongoAbstractConsumerThread.java      | 112 +++++++++++
 ...umer.java => MongoDbChangeStreamsConsumer.java} |  40 ++--
 .../mongodb3/MongoDbChangeStreamsThread.java       |  94 +++++++++
 .../component/mongodb3/MongoDbConsumerType.java    |   4 +-
 .../camel/component/mongodb3/MongoDbEndpoint.java  | 100 +++++++---
 .../mongodb3/MongoDbTailableCursorConsumer.java    |  13 +-
 .../component/mongodb3/MongoDbTailingProcess.java  | 219 ---------------------
 .../component/mongodb3/MongoDbTailingThread.java   | 144 ++++++++++++++
 .../mongodb3/EmbedMongoConfiguration.java          |  15 +-
 .../mongodb3/MongoDbChangeStreamsConsumerTest.java | 113 +++++++++++
 11 files changed, 615 insertions(+), 283 deletions(-)

diff --git a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc 
b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
index c60c085..5bbb567 100644
--- a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
+++ b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
@@ -1,9 +1,12 @@
 [[mongodb3-component]]
-= MongoDB Component
+== MongoDB Component
+
+*Available as of Camel version 2.19*
 
 *Available as of Camel version 2.19*
 
 
+
 Note: Camel MongoDB3 component Use the Mongo Driver for Java 3.4. If your are 
looking for previews versions look the Camel MongoDB component
 
 According to Wikipedia: "NoSQL is a movement promoting a loosely defined
@@ -72,7 +75,7 @@ mongodb3:connectionBean
 
 with the following path and query parameters:
 
-=== Path Parameters (1 parameters):
+==== Path Parameters (1 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -82,7 +85,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (19 parameters):
+==== Query Parameters (21 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -95,18 +98,20 @@ with the following path and query parameters:
 | *operation* (common) | Sets the operation this endpoint will execute against 
MongoDB. For possible values, see MongoDbOperation. |  | MongoDbOperation
 | *outputType* (common) | Convert the output of the producer to the selected 
type : DocumentList Document or MongoIterable. DocumentList or MongoIterable 
applies to findAll and aggregate. Document applies to all other operations. |  
| MongoDbOutputType
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| *consumerType* (consumer) | Consumer type. |  | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *cursorRegenerationDelay* (advanced) | MongoDB tailable cursors will block 
until new data arrives. If no new data is inserted, after some time the cursor 
will be automatically freed and closed by the MongoDB server. The client is 
expected to regenerate the cursor if needed. This value specifies the time to 
wait before attempting to fetch a new cursor, and if the attempt fails, how 
long before the next attempt is made. Default value is 1000ms. | 1000 | long
 | *dynamicity* (advanced) | Sets whether this endpoint will attempt to 
dynamically resolve the target database and collection from the incoming 
Exchange properties. Can be used to override at runtime the database and 
collection specified on the otherwise static endpoint URI. It is disabled by 
default to boost performance. Enabling it will take a minimal performance hit. 
| false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be 
strictly used, or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean
 | *writeResultAsHeader* (advanced) | In write operations, it determines 
whether instead of returning WriteResult as the body of the OUT message, we 
transfer the IN message to the OUT and attach the WriteResult as a header. | 
false | boolean
+| *streamFilter* (changeStream) | Filter condition for change streams 
consumer. |  | String
 | *persistentId* (tail) | One tail tracking collection can host many trackers 
for several tailable consumers. To keep them separate, each tracker should have 
its own unique persistentId. |  | String
 | *persistentTailTracking* (tail) | Enable persistent tail tracking, which is 
a mechanism to keep track of the last consumed message across system restarts. 
The next time the system is up, the endpoint will recover the cursor from the 
point where it last stopped slurping records. | false | boolean
 | *tailTrackCollection* (tail) | Collection where tail tracking information 
will be persisted. If not specified, 
MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default. |  | 
String
 | *tailTrackDb* (tail) | Indicates what database the tail tracking mechanism 
will persist to. If not specified, the current database will be picked by 
default. Dynamicity will not be taken into account even if enabled, i.e. the 
tail tracking database will not vary past endpoint initialisation. |  | String
 | *tailTrackField* (tail) | Field where the last tracked value will be placed. 
If not specified, MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by 
default. |  | String
-| *tailTrackIncreasingField* (tail) | Correlation field in the incoming record 
which is of increasing nature and will be used to position the tailing cursor 
every time it is generated. The cursor will be (re)created with a query of 
type: tailTrackIncreasingField lastValue (possibly recovered from persistent 
tail tracking). Can be of type Integer, Date, String, etc. NOTE: No support for 
dot notation at the current time, so the field should be at the top level of 
the document. |  | String
+| *tailTrackIncreasingField* (tail) | Correlation field in the incoming record 
which is of increasing nature and will be used to position the tailing cursor 
every time it is generated. The cursor will be (re)created with a query of 
type: tailTrackIncreasingField greater than lastValue (possibly recovered from 
persistent tail tracking). Can be of type Integer, Date, String, etc. NOTE: No 
support for dot notation at the current time, so the field should be at the top 
level of the document. [...]
 |===
 // endpoint options: END
 // spring-boot-auto-configure options: START
@@ -938,6 +943,35 @@ 
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasin
     .to("mock:test");
 
-----------------------------------------------------------------------------------------------------------------------------------
 
+=== Change Streams Consumer
+
+Change Streams allow applications to access real-time data changes without the 
complexity and risk of tailing the MongoDB oplog.
+Applications can use change streams to subscribe to all data changes on a 
collection and immediately react to them.
+Because change streams use the aggregation framework, applications can also 
filter for specific changes or transform the notifications at will.
+
+To configure Change Streams Consumer you need to specify `consumerType`, 
`database`, `collection`
+and optional JSON property `streamFilter` to filter events.
+That JSON property is standard MongoDB `$match` aggregation.
+It could be easily specified using XML DSL configuration:
+
+[source,xml]
+-------------
+<route id="filterConsumer">
+    <from 
uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
+    <to uri="mock:test"/>
+
+    <routeProperty key="streamFilter" 
value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
+</route>
+-------------
+
+Java configuration:
+[source,java]
+-------------
+from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
+    .routeProperty("streamFilter", 
"{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
+    .to("mock:test");
+-------------
+
 === Type conversions
 
 The `MongoDbBasicConverters` type converter included with the
@@ -957,4 +991,4 @@ object to a `Map`, which is in turn used to initialise a new
 |=======================================================================
 
 This type converter is auto-discovered, so you don't need to configure
-anything manually.
+anything manually.
\ No newline at end of file
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java
new file mode 100644
index 0000000..a564171
--- /dev/null
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.impl.DefaultConsumer;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class MongoAbstractConsumerThread implements Runnable {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    // local final copies of variables for increased performance
+    protected final long cursorRegenerationDelay;
+    protected final boolean cursorRegenerationDelayEnabled;
+
+    protected final MongoCollection<Document> dbCol;
+    protected final DefaultConsumer consumer;
+    protected final MongoDbEndpoint endpoint;
+    protected MongoCursor cursor;
+
+    volatile boolean keepRunning = true;
+    private volatile boolean stopped;
+    private volatile CountDownLatch stoppedLatch;
+
+    MongoAbstractConsumerThread(MongoDbEndpoint endpoint, DefaultConsumer 
consumer) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+        this.dbCol = endpoint.getMongoCollection();
+        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay 
== 0);
+    }
+
+    protected abstract MongoCursor<Document> initializeCursor();
+    protected abstract void init() throws Exception;
+    protected abstract void doRun();
+    protected abstract void regeneratingCursor();
+
+    /**
+     * Main loop.
+     */
+    @Override
+    public void run() {
+        stoppedLatch = new CountDownLatch(1);
+        while (keepRunning) {
+            doRun();
+            // regenerate the cursor, if reading failed for some reason
+            if (keepRunning) {
+                cursor.close();
+                regeneratingCursor();
+
+                if (cursorRegenerationDelayEnabled) {
+                    try {
+                        Thread.sleep(cursorRegenerationDelay);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+
+                cursor = initializeCursor();
+            }
+        }
+
+        stopped = true;
+        stoppedLatch.countDown();
+    }
+
+    protected void stop() throws Exception {
+        if (log.isInfoEnabled()) {
+            log.info("Stopping MongoDB Tailable Cursor consumer, bound to 
collection: {}",
+                    String.format("db: %s, col: %s", endpoint.getDatabase(), 
endpoint.getCollection()));
+        }
+
+        keepRunning = false;
+        if (cursor != null) {
+            cursor.close();
+        }
+        awaitStopped();
+
+        if (log.isInfoEnabled()) {
+            log.info("Stopped MongoDB Tailable Cursor consumer, bound to 
collection: {}",
+                    String.format("db: %s, col: %s", endpoint.getDatabase(), 
endpoint.getCollection()));
+        }
+    }
+
+    private void awaitStopped() throws InterruptedException {
+        if (!stopped) {
+            log.info("Going to wait for stopping");
+            stoppedLatch.await();
+        }
+    }
+}
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
similarity index 61%
copy from 
components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
copy to 
components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
index e0a2f9f..933c576 100644
--- 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
@@ -16,20 +16,28 @@
  */
 package org.apache.camel.component.mongodb3;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import static java.util.Collections.singletonList;
+
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.BsonDocument;
 
 /**
- * The MongoDb consumer.
+ * The MongoDb Change Streams consumer.
  */
-public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+public class MongoDbChangeStreamsConsumer extends DefaultConsumer {
+
+    private static final String STREAM_FILTER_PROPERTY = "streamFilter";
+
     private final MongoDbEndpoint endpoint;
     private ExecutorService executor;
-    private MongoDbTailingProcess tailingProcess;
+    private MongoDbChangeStreamsThread changeStreamsThread;
 
-    public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor 
processor) {
+    public MongoDbChangeStreamsConsumer(MongoDbEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
@@ -37,8 +45,8 @@ public class MongoDbTailableCursorConsumer extends 
DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (tailingProcess != null) {
-            tailingProcess.stop();
+        if (changeStreamsThread != null) {
+            changeStreamsThread.stop();
         }
         if (executor != null) {
             
endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
@@ -49,17 +57,15 @@ public class MongoDbTailableCursorConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(), 1);
-        MongoDbTailTrackingManager trackingManager = initTailTracking();
-        tailingProcess = new MongoDbTailingProcess(endpoint, this, 
trackingManager);
-        tailingProcess.initializeProcess();
-        executor.execute(tailingProcess);
-    }
+        String streamFilter = (String) 
getRoute().getProperties().get(STREAM_FILTER_PROPERTY);
+        List<BsonDocument> bsonFilter = null;
+        if (ObjectHelper.isNotEmpty(streamFilter)) {
+            bsonFilter = singletonList(BsonDocument.parse(streamFilter));
+        }
 
-    protected MongoDbTailTrackingManager initTailTracking() throws Exception {
-        MongoDbTailTrackingManager answer = new 
MongoDbTailTrackingManager(endpoint.getMongoConnection(), 
endpoint.getTailTrackingConfig());
-        answer.initialize();
-        return answer;
+        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(), 1);
+        changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, 
bsonFilter);
+        changeStreamsThread.init();
+        executor.execute(changeStreamsThread);
     }
-
 }
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java
new file mode 100644
index 0000000..0f08b35
--- /dev/null
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.List;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.ChangeStreamIterable;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import org.apache.camel.Exchange;
+import org.bson.BsonDocument;
+import org.bson.Document;
+
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
+    private List<BsonDocument> bsonFilter;
+    private BsonDocument resumeToken;
+
+    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, 
MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) {
+        super(endpoint, consumer);
+        this.bsonFilter = bsonFilter;
+    }
+
+    @Override
+    protected void init() {
+        cursor = initializeCursor();
+    }
+
+    @Override
+    protected MongoCursor initializeCursor() {
+        ChangeStreamIterable<Document> iterable = bsonFilter != null
+                ? dbCol.watch(bsonFilter)
+                : dbCol.watch();
+
+        if (resumeToken != null) {
+            iterable = iterable.resumeAfter(resumeToken);
+        }
+
+        MongoCursor<ChangeStreamDocument<Document>> cursor = 
iterable.iterator();
+        return cursor;
+    }
+
+    @Override
+    protected void regeneratingCursor() {
+        if (log.isDebugEnabled()) {
+            log.debug("Regenerating cursor, waiting {}ms first", 
cursorRegenerationDelay);
+        }
+    }
+
+    @Override
+    protected void doRun() {
+        try {
+            while (cursor.hasNext() && keepRunning) {
+                ChangeStreamDocument<Document> dbObj = 
(ChangeStreamDocument<Document>) cursor.next();
+                Exchange exchange = 
endpoint.createMongoDbExchange(dbObj.getFullDocument());
+
+                try {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Sending exchange: {}, ObjectId: {}", 
exchange, dbObj.getFullDocument().get(MONGO_ID));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception ignored) {
+                }
+
+                this.resumeToken = dbObj.getResumeToken();
+            }
+        } catch (MongoException e) {
+            // cursor.hasNext() opens socket and waiting for data
+            // it throws exception when cursor is closed in another thread
+            // there is no way to stop hasNext() before closing cursor
+            if (keepRunning) {
+                throw e;
+            } else {
+                log.debug("Exception from MongoDB, will regenerate cursor.", 
e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
index 6cfe084..890d737 100644
--- 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.mongodb3;
 
 public enum MongoDbConsumerType {
 
-    tailable
+    tailable,
+    changeStreams
     // more consumer types to be included in future versions
-
 }
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
index 1a3e419..9b3613d 100644
--- 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
@@ -30,6 +30,7 @@ import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -40,26 +41,24 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
+
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.mongodb3.MongoDbOperation.command;
 import static org.apache.camel.component.mongodb3.MongoDbOperation.findAll;
 import static org.apache.camel.component.mongodb3.MongoDbOperation.getDbStats;
-import static org.apache.camel.component.mongodb3.MongoDbOperation.valueOf;
-import static org.apache.camel.component.mongodb3.MongoDbOutputType.Document;
 import static 
org.apache.camel.component.mongodb3.MongoDbOutputType.DocumentList;
-import static 
org.apache.camel.component.mongodb3.MongoDbOutputType.MongoIterable;
 
 /**
  * Component for working with documents stored in MongoDB database.
  */
 @UriEndpoint(firstVersion = "2.19.0", scheme = "mongodb3", title = "MongoDB", 
syntax = "mongodb3:connectionBean",
-    consumerClass = MongoDbTailableCursorConsumer.class, label = 
"database,nosql")
+    label = "database,nosql")
 public class MongoDbEndpoint extends DefaultEndpoint {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDbEndpoint.class);
@@ -84,13 +83,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private boolean dynamicity;
     @UriParam(label = "advanced")
     private boolean writeResultAsHeader;
-    // tailable cursor consumer by default
-    private MongoDbConsumerType consumerType;
+    @UriParam(label = "consumer")
+    private String consumerType;
     @UriParam(label = "advanced", defaultValue = "1000")
     private long cursorRegenerationDelay = 1000L;
     @UriParam(label = "tail")
     private String tailTrackIncreasingField;
 
+    @UriParam(label = "changeStream")
+    private String streamFilter;
+
     // persistent tail tracking
     @UriParam(label = "tail")
     private boolean persistentTailTracking;
@@ -104,14 +106,15 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private String tailTrackField;
     @UriParam(label = "common")
     private MongoDbOutputType outputType;
-    
+
+    // tailable cursor consumer by default
+    private MongoDbConsumerType dbConsumerType;
+
     private MongoDbTailTrackingConfig tailTrackingConfig;
 
     private MongoDatabase mongoDatabase;
     private MongoCollection<Document> mongoCollection;
 
-    // ======= Constructors ===============================================
-
     public MongoDbEndpoint() {
     }
 
@@ -119,14 +122,14 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         super(uri, component);
     }
 
-    // ======= Implementation methods =====================================
-
-    public Producer createProducer() throws Exception {
+    @Override
+    public Producer createProducer() {
         validateProducerOptions();
         initializeConnection();
         return new MongoDbProducer(this);
     }
 
+    @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         validateConsumerOptions();
 
@@ -135,15 +138,25 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         initializeConnection();
 
         // select right consumer type
-        if (consumerType == null) {
-            consumerType = MongoDbConsumerType.tailable;
+        try {
+            dbConsumerType = ObjectHelper.isEmpty(consumerType)
+                    ? MongoDbConsumerType.tailable
+                    : MongoDbConsumerType.valueOf(consumerType);
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Consumer type not supported: " + 
consumerType, e);
         }
 
         Consumer consumer;
-        if (consumerType == MongoDbConsumerType.tailable) {
+
+        switch (dbConsumerType) {
+        case tailable:
             consumer = new MongoDbTailableCursorConsumer(this, processor);
-        } else {
-            throw new CamelMongoDbException("Consumer type not supported: " + 
consumerType);
+            break;
+        case changeStreams:
+            consumer = new MongoDbChangeStreamsConsumer(this, processor);
+            break;
+        default:
+            throw new CamelMongoDbException("Consumer type not supported: " + 
dbConsumerType);
         }
 
         configureConsumer(consumer);
@@ -161,10 +174,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
             if (DocumentList.equals(outputType) && 
!(findAll.equals(operation))) {
                 throw new IllegalArgumentException("outputType DocumentList is 
only compatible with operation findAll");
             }
-            if (MongoIterable.equals(outputType) && 
!(findAll.equals(operation))) {
+            if (MongoDbOutputType.MongoIterable.equals(outputType) && 
!(findAll.equals(operation))) {
                 throw new IllegalArgumentException("outputType MongoIterable 
is only compatible with operation findAll");
             }
-            if (Document.equals(outputType) && (findAll.equals(operation))) {
+            if (MongoDbOutputType.Document.equals(outputType) && 
(findAll.equals(operation))) {
                 throw new IllegalArgumentException("outputType Document is not 
compatible with operation findAll");
             }
         }
@@ -174,9 +187,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         // make our best effort to validate, options with defaults are checked
         // against their defaults, which is not always a guarantee that
         // they haven't been explicitly set, but it is enough
-        if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || 
!ObjectHelper.isEmpty(tailTrackDb) || !ObjectHelper.isEmpty(tailTrackCollection)
+        if (!ObjectHelper.isEmpty(dbConsumerType) || persistentTailTracking || 
!ObjectHelper.isEmpty(tailTrackDb) || !ObjectHelper.isEmpty(tailTrackCollection)
             || !ObjectHelper.isEmpty(tailTrackField) || 
cursorRegenerationDelay != 1000L) {
-            throw new IllegalArgumentException("consumerType, tailTracking, 
cursorRegenerationDelay options cannot appear on a producer endpoint");
+            throw new IllegalArgumentException("dbConsumerType, tailTracking, 
cursorRegenerationDelay options cannot appear on a producer endpoint");
         }
     }
 
@@ -187,7 +200,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         if (!ObjectHelper.isEmpty(operation) || dynamicity || outputType != 
null) {
             throw new IllegalArgumentException("operation, dynamicity, 
outputType " + "options cannot appear on a consumer endpoint");
         }
-        if (consumerType == MongoDbConsumerType.tailable) {
+        if (dbConsumerType == MongoDbConsumerType.tailable) {
             if (tailTrackIncreasingField == null) {
                 throw new IllegalArgumentException("tailTrackIncreasingField 
option must be set for tailable cursor MongoDB consumer endpoint");
             }
@@ -209,7 +222,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
      */
     public void initializeConnection() throws CamelMongoDbException {
         LOG.info("Initialising MongoDb endpoint: {}", this);
-        if (database == null || (collection == null && 
!(getDbStats.equals(operation) || command.equals(operation)))) {
+        if (database == null || (collection == null && 
!(getDbStats.equals(operation) || MongoDbOperation.command.equals(operation)))) 
{
             throw new CamelMongoDbException("Missing required endpoint 
configuration: database and/or collection");
         }
         mongoDatabase = mongoConnection.getDatabase(database);
@@ -301,7 +314,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         LOG.debug("Resolved the connection with the name {} as {}", 
connectionBean, mongoConnection);
         super.doStart();
     }
-    
+
     // ======= Getters and setters
     // ===============================================
 
@@ -350,7 +363,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
      */
     public void setOperation(String operation) throws CamelMongoDbException {
         try {
-            this.operation = valueOf(operation);
+            this.operation = MongoDbOperation.valueOf(operation);
         } catch (IllegalArgumentException e) {
             throw new CamelMongoDbException("Operation not supported", e);
         }
@@ -456,21 +469,32 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     /**
      * Reserved for future use, when more consumer types are supported.
      *
-     * @param consumerType key of the consumer type
-     * @throws CamelMongoDbException
+     * @param dbConsumerType key of the consumer type
+     * @throws CamelMongoDbException if consumer type is not supported
      */
-    public void setConsumerType(String consumerType) throws 
CamelMongoDbException {
+    public void setDbConsumerType(String dbConsumerType) throws 
CamelMongoDbException {
         try {
-            this.consumerType = MongoDbConsumerType.valueOf(consumerType);
+            this.dbConsumerType = MongoDbConsumerType.valueOf(dbConsumerType);
         } catch (IllegalArgumentException e) {
             throw new CamelMongoDbException("Consumer type not supported", e);
         }
     }
 
-    public MongoDbConsumerType getConsumerType() {
+    public MongoDbConsumerType getDbConsumerType() {
+        return dbConsumerType;
+    }
+
+    public String getConsumerType() {
         return consumerType;
     }
 
+    /**
+     * Consumer type.
+     */
+    public void setConsumerType(String consumerType) {
+        this.consumerType = consumerType;
+    }
+
     public String getTailTrackDb() {
         return tailTrackDb;
     }
@@ -536,7 +560,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
      * Correlation field in the incoming record which is of increasing nature
      * and will be used to position the tailing cursor every time it is
      * generated. The cursor will be (re)created with a query of type:
-     * tailTrackIncreasingField > lastValue (possibly recovered from persistent
+     * tailTrackIncreasingField greater than lastValue (possibly recovered 
from persistent
      * tail tracking). Can be of type Integer, Date, String, etc. NOTE: No
      * support for dot notation at the current time, so the field should be at
      * the top level of the document.
@@ -628,4 +652,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoCollection<Document> getMongoCollection() {
         return mongoCollection;
     }
+
+    public String getStreamFilter() {
+        return streamFilter;
+    }
+
+    /**
+     * Filter condition for change streams consumer.
+     */
+    public void setStreamFilter(String streamFilter) {
+        this.streamFilter = streamFilter;
+    }
+
 }
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
index e0a2f9f..24fae51 100644
--- 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
@@ -27,7 +27,7 @@ import org.apache.camel.impl.DefaultConsumer;
 public class MongoDbTailableCursorConsumer extends DefaultConsumer {
     private final MongoDbEndpoint endpoint;
     private ExecutorService executor;
-    private MongoDbTailingProcess tailingProcess;
+    private MongoDbTailingThread tailingThread;
 
     public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
@@ -37,8 +37,8 @@ public class MongoDbTailableCursorConsumer extends 
DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (tailingProcess != null) {
-            tailingProcess.stop();
+        if (tailingThread != null) {
+            tailingThread.stop();
         }
         if (executor != null) {
             
endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
@@ -51,9 +51,9 @@ public class MongoDbTailableCursorConsumer extends 
DefaultConsumer {
         super.doStart();
         executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(), 1);
         MongoDbTailTrackingManager trackingManager = initTailTracking();
-        tailingProcess = new MongoDbTailingProcess(endpoint, this, 
trackingManager);
-        tailingProcess.initializeProcess();
-        executor.execute(tailingProcess);
+        tailingThread = new MongoDbTailingThread(endpoint, this, 
trackingManager);
+        tailingThread.init();
+        executor.execute(tailingThread);
     }
 
     protected MongoDbTailTrackingManager initTailTracking() throws Exception {
@@ -61,5 +61,4 @@ public class MongoDbTailableCursorConsumer extends 
DefaultConsumer {
         answer.initialize();
         return answer;
     }
-
 }
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
deleted file mode 100644
index 40e8be2..0000000
--- 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mongodb3;
-
-import java.util.concurrent.CountDownLatch;
-
-import com.mongodb.CursorType;
-import com.mongodb.MongoCursorNotFoundException;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import org.apache.camel.Exchange;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.mongodb.client.model.Filters.gt;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
-
-public class MongoDbTailingProcess implements Runnable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDbTailingProcess.class);
-    private static final String CAPPED_KEY = "capped";
-
-    public volatile boolean keepRunning = true;
-    public volatile boolean stopped; // = false
-    private volatile CountDownLatch stoppedLatch;
-
-    private final MongoCollection<Document> dbCol;
-    private final MongoDbEndpoint endpoint;
-    private final MongoDbTailableCursorConsumer consumer;
-
-    // create local, final copies of these variables for increased performance
-    private final long cursorRegenerationDelay;
-    private final boolean cursorRegenerationDelayEnabled;
-
-    private MongoCursor<Document> cursor;
-    private MongoDbTailTrackingManager tailTracking;
-
-    public MongoDbTailingProcess(MongoDbEndpoint endpoint, 
MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
-        this.endpoint = endpoint;
-        this.consumer = consumer;
-        this.dbCol = endpoint.getMongoCollection();
-        this.tailTracking = tailTrack;
-        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
-        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay 
== 0);
-    }
-
-    public MongoCursor<Document> getCursor() {
-        return cursor;
-    }
-
-    /**
-     * Initialise the tailing process, the cursor and if persistent tail
-     * tracking is enabled, recover the cursor from the persisted point. As 
part
-     * of the initialisation process, the component will validate that the
-     * collection we are targeting is 'capped'.
-     *
-     * @throws Exception
-     */
-    public void initializeProcess() throws Exception {
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to 
collection: {}", "db: " + endpoint.getMongoDatabase() + ", col: " + 
endpoint.getCollection());
-        }
-
-        if (!isCollectionCapped()) {
-            throw new CamelMongoDbException("Tailable cursors are only 
compatible with capped collections, and collection " + endpoint.getCollection() 
+ " is not capped");
-        }
-        try {
-            // recover the last value from the store if it exists
-            tailTracking.recoverFromStore();
-            cursor = initializeCursor();
-        } catch (Exception e) {
-            throw new CamelMongoDbException("Exception occurred while 
initializing tailable cursor", e);
-        }
-
-        if (cursor == null) {
-            throw new CamelMongoDbException("Tailable cursor was not 
initialized, or cursor returned is dead on arrival");
-        }
-
-    }
-
-    private Boolean isCollectionCapped() {
-        return 
endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
-    }
-
-    private Document createCollStatsCommand() {
-        return new Document("collStats", endpoint.getCollection());
-    }
-
-    /**
-     * The heart of the tailing process.
-     */
-    @Override
-    public void run() {
-        stoppedLatch = new CountDownLatch(1);
-        while (keepRunning) {
-            doRun();
-            // if the previous call didn't return because we have stopped
-            // running, then regenerate the cursor
-            if (keepRunning) {
-                cursor.close();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Regenerating cursor with lastVal: {}, waiting 
{} ms first", tailTracking.lastVal, cursorRegenerationDelay);
-                }
-
-                if (cursorRegenerationDelayEnabled) {
-                    try {
-                        Thread.sleep(cursorRegenerationDelay);
-                    } catch (InterruptedException e) {
-                        // ignore
-                    }
-                }
-
-                cursor = initializeCursor();
-            }
-        }
-
-        stopped = true;
-        stoppedLatch.countDown();
-    }
-
-    protected void stop() throws Exception {
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to 
collection: {}", "db: " + endpoint.getDatabase() + ", col: " + 
endpoint.getCollection());
-        }
-        keepRunning = false;
-        // close the cursor if it's open, so if it is blocked on hasNext() it
-        // will return immediately
-        if (cursor != null) {
-            cursor.close();
-        }
-        awaitStopped();
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to 
collection: {}", "db: " + endpoint.getDatabase() + ", col: " + 
endpoint.getCollection());
-        }
-    }
-
-    /**
-     * The heart of the tailing process.
-     */
-    private void doRun() {
-        // while the cursor has more values, keepRunning is true and the
-        // cursorId is not 0, which symbolizes that the cursor is dead
-        try {
-            while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() 
!=
-                                                      // 0 &&
-                Document dbObj = cursor.next();
-                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
-                try {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Sending exchange: {}, ObjectId: {}", 
exchange, dbObj.get(MONGO_ID));
-                    }
-                    consumer.getProcessor().process(exchange);
-                } catch (Exception e) {
-                    // do nothing
-                }
-                tailTracking.setLastVal(dbObj);
-            }
-        } catch (MongoCursorNotFoundException e) {
-            // we only log the warning if we are not stopping, otherwise it is
-            // expected because the stop() method kills the cursor just in case
-            // it is blocked
-            // waiting for more data to arrive
-            if (keepRunning) {
-                LOG.debug("Cursor not found exception from MongoDB, will 
regenerate cursor. This is normal behaviour with tailable cursors.", e);
-            }
-        } catch (IllegalStateException e) {
-            // this is happening when the consumer is stopped or the mongo 
interrupted (ie, junit ending test)
-            // as we cannot resume, we shutdown the thread gracefully
-            LOG.info("Cursor was closed, likely the consumer was stopped and 
closed the cursor on purpose.", e);
-            if (cursor != null) {
-                cursor.close();
-            }
-            keepRunning = false;
-        } finally {
-            // the loop finished, persist the lastValue just in case we are 
shutting down
-            // TODO: perhaps add a functionality to persist every N records
-            tailTracking.persistToStore();
-        }
-    }
-
-    // no arguments, will ask DB what the last updated Id was (checking
-    // persistent storage)
-    private MongoCursor<Document> initializeCursor() {
-        Object lastVal = tailTracking.lastVal;
-        // lastVal can be null if we are initializing and there is no
-        // persistence enabled
-        MongoCursor<Document> answer;
-        if (lastVal == null) {
-            answer = 
dbCol.find().cursorType(CursorType.TailableAwait).iterator();
-        } else {
-            MongoCursor<Document> iterator = 
dbCol.find(gt(tailTracking.getIncreasingFieldName(), 
lastVal)).cursorType(CursorType.TailableAwait).iterator();
-            answer = iterator;
-        }
-        return answer;
-    }
-
-    private void awaitStopped() throws InterruptedException {
-        if (!stopped) {
-            LOG.info("Going to wait for stopping");
-            stoppedLatch.await();
-        }
-    }
-
-}
diff --git 
a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java
 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java
new file mode 100644
index 0000000..e2387ce
--- /dev/null
+++ 
b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import com.mongodb.CursorType;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCursor;
+import org.apache.camel.Exchange;
+import org.bson.Document;
+
+import static com.mongodb.client.model.Filters.gt;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+class MongoDbTailingThread extends MongoAbstractConsumerThread {
+
+    private static final String CAPPED_KEY = "capped";
+    private MongoDbTailTrackingManager tailTracking;
+
+    MongoDbTailingThread(MongoDbEndpoint endpoint, 
MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
+        super(endpoint, consumer);
+        this.tailTracking = tailTrack;
+    }
+
+    /**
+     * Initialise the tailing process, the cursor and if persistent tail 
tracking is enabled,
+     * recover the cursor from the persisted point.
+     * As part of the initialisation process,
+     * the component will validate that the collection we are targeting is 
'capped'.
+     */
+    @Override
+    protected void init() {
+        if (log.isInfoEnabled()) {
+            log.info("Starting MongoDB Tailable Cursor consumer, binding to 
collection: {}",
+                    String.format("db: %s, col: %s", 
endpoint.getMongoDatabase(), endpoint.getCollection()));
+        }
+
+        if (!isCollectionCapped()) {
+            throw new CamelMongoDbException(
+                    String.format("Tailable cursors are only compatible with 
capped collections, and collection %s is not capped",
+                            endpoint.getCollection()));
+        }
+        try {
+            // recover the last value from the store if it exists
+            tailTracking.recoverFromStore();
+            cursor = initializeCursor();
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Exception occurred while 
initializing tailable cursor", e);
+        }
+
+        if (cursor == null) {
+            throw new CamelMongoDbException("Tailable cursor was not 
initialized, or cursor returned is dead on arrival");
+        }
+    }
+
+    private Boolean isCollectionCapped() {
+        return 
endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+    }
+
+    private Document createCollStatsCommand() {
+        return new Document("collStats", endpoint.getCollection());
+    }
+
+    @Override
+    // no arguments, will ask DB what the last updated Id was (checking 
persistent storage)
+    protected MongoCursor<Document> initializeCursor() {
+        Object lastVal = tailTracking.lastVal;
+        // lastVal can be null if we are initializing and there is no 
persistence enabled
+        MongoCursor<Document> answer;
+        if (lastVal == null) {
+            answer = 
dbCol.find().cursorType(CursorType.TailableAwait).iterator();
+        } else {
+            MongoCursor<Document> iterator = 
dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal))
+                    .cursorType(CursorType.TailableAwait)
+                    .iterator();
+            answer = iterator;
+        }
+        return answer;
+    }
+
+    @Override
+    protected void regeneratingCursor() {
+        if (log.isDebugEnabled()) {
+            log.debug("Regenerating cursor with lastVal: {}, waiting {} ms 
first", tailTracking.lastVal, cursorRegenerationDelay);
+        }
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    @Override
+    protected void doRun() {
+        // while the cursor has more values, keepRunning is true and the
+        // cursorId is not 0, which symbolizes that the cursor is dead
+        try {
+            while (cursor.hasNext() && keepRunning) {
+                Document dbObj = (Document) cursor.next();
+                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                try {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Sending exchange: {}, ObjectId: {}", 
exchange, dbObj.get(MONGO_ID));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception e) {
+                    // do nothing
+                }
+                tailTracking.setLastVal(dbObj);
+            }
+        } catch (MongoCursorNotFoundException e) {
+            // we only log the warning if we are not stopping, otherwise it is
+            // expected because the stop() method kills the cursor just in case
+            // it is blocked
+            // waiting for more data to arrive
+            if (keepRunning) {
+                log.debug("Cursor not found exception from MongoDB, will 
regenerate cursor. This is normal behaviour with tailable cursors.", e);
+            }
+        } catch (IllegalStateException e) {
+            // this is happening when the consumer is stopped or the mongo 
interrupted (ie, junit ending test)
+            // as we cannot resume, we shutdown the thread gracefully
+            log.info("Cursor was closed, likely the consumer was stopped and 
closed the cursor on purpose.", e);
+            if (cursor != null) {
+                cursor.close();
+            }
+            keepRunning = false;
+        } finally {
+            // the loop finished, persist the lastValue just in case we are 
shutting down
+            // TODO: perhaps add a functionality to persist every N records
+            tailTracking.persistToStore();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
 
b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
index 03378c8..08c5630 100644
--- 
a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
+++ 
b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
@@ -28,6 +28,9 @@ import de.flapdoodle.embed.mongo.MongodStarter;
 import de.flapdoodle.embed.mongo.config.IMongodConfig;
 import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
 import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+
+import org.bson.Document;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -44,9 +47,19 @@ public class EmbedMongoConfiguration {
 
     static {
         try {
-            IMongodConfig mongodConfig = new 
MongodConfigBuilder().version(PRODUCTION).net(new Net(PORT, 
localhostIsIPv6())).build();
+            IMongodConfig mongodConfig = new MongodConfigBuilder()
+                    .version(PRODUCTION)
+                    .net(new Net(PORT, localhostIsIPv6()))
+                    .replication(new Storage(null, "replicationName", 5000))
+                    .build();
+
             MongodExecutable mongodExecutable = 
MongodStarter.getDefaultInstance().prepare(mongodConfig);
             mongodExecutable.start();
+
+            // init replica set
+            MongoClient client = new MongoClient("localhost", PORT);
+            client.getDatabase("admin").runCommand(new 
Document("replSetInitiate", new Document()));
+
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java
 
b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java
new file mode 100644
index 0000000..6c28ac7
--- /dev/null
+++ 
b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.bson.Document;
+import org.junit.Test;
+
+public class MongoDbChangeStreamsConsumerTest extends AbstractMongoDbTest {
+
+    private MongoCollection<Document> mongoCollection;
+    private String collectionName;
+
+    @Override
+    public void doPostSetup() {
+        super.doPostSetup();
+
+        collectionName = "camelTest";
+        mongoCollection = db.getCollection(collectionName, Document.class);
+        mongoCollection.drop();
+
+        CreateCollectionOptions collectionOptions = new 
CreateCollectionOptions();
+        db.createCollection(collectionName, collectionOptions);
+        mongoCollection = db.getCollection(collectionName, Document.class);
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(10);
+
+        String consumerRouteId = "simpleConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", 
i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Test
+    public void filterTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(1);
+
+        String consumerRouteId = "filterConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", 
i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+
+        Document actualDocument = 
mock.getExchanges().get(0).getIn().getBody(Document.class);
+        assertEquals("value2", actualDocument.get("string"));
+
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+
+    protected void addTestRoutes() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                
from("mongodb3:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+                        .id("simpleConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
+
+                
from("mongodb3:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+                        .routeProperty("streamFilter", 
"{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}")
+                        .id("filterConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
+            }
+        });
+    }
+}
\ No newline at end of file

Reply via email to