Hello Folks, I have tried to apply the solution you have suggested. I have declared mongodb CDC source as:
MongoDBSource.<T>builder() .hosts(hostname) .scheme(SCHEME) .databaseList(dbName) .collectionList(dbName + "." + collectionName) .username(username) .password(password) .startupOptions(StartupOptions.initial()) .batchSize(2048) .connectionOptions("heartbeat.interval.ms=5000") .closeIdleReaders(true) .deserializer(...) However after around a week of starting this stream I again get the following error again: com.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server xxxxx.yyy.mongodb.net:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1746774243, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "wdywOjmcLPIkhRxA0PsD2JHXzxs=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1746774243, "i": 1}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76) at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288) at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239) at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220) at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358) at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357) at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356) at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223) at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204) at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191) at com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187) at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212) at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:248) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Is there something I am still missing ? How can I get this issue fixed ? Thanks Sachin On Tue, Mar 25, 2025 at 2:20 PM Xiqian YU <kono....@outlook.com> wrote: > Hi Sachin, > > Seems MongoDB CDC is trying to restore from a previous change stream > position, which has been removed from oplog collection. The resumeToken > data provided (8267DD842E000000A22B0229296E04) was created at Friday, March > 21, 2025 3:22:22 PM (4 days ago), which may have exceeded the MongoDB > server oplog's TTL. > > For the CDC client side, there’s a “heartbeat.interval.ms” [1] option to > send heartbeat requests to MongoDB server regularly and refreshes resume > token position. It is suggested to set it to a reasonable interval if > captured collection doesn’t have much change logs produced. > > [1] > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/connectors/flink-sources/mongodb-cdc/#connector-options > > Best Regards, > Xiqian > > 2025年3月25日 15:42,Sachin Mittal <sjmit...@gmail.com> 写道: > > Hi, > We are using Flink CDC as our datastream application deployed on AWS KDA. > Our MongoDB is deployed on Mongo Atlas. > > The versions are: > Flink : 1.20.0 > MongoCDC (flink-connector-mongodb-cdc) : 3.1.1 > > After the application is running for few days, I get the following error: > > java.lang.RuntimeException: One or more fetchers have encountered > exception > at org.apache.flink.connector.base.source.reader.fetcher. > SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) > at org.apache.flink.connector.base.source.reader.SourceReaderBase > .getNextFetch(SourceReaderBase.java:228) > at org.apache.flink.connector.base.source.reader.SourceReaderBase > .pollNext(SourceReaderBase.java:190) > at org.apache.flink.streaming.api.operators.SourceOperator.emitNext( > SourceOperator.java:444) > at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext( > StreamTaskSourceInput.java:68) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:638) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:231) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:973) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > .java:917) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:972) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 951) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher > .runOnce(SplitFetcher.java:168) > at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run( > SplitFetcher.java:117) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors > .java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1128) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:628) > ... 1 more > Caused by: java.io.IOException: org.apache.flink.util. > FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', > offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"}, > timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp= > 9223372034707292159}, isSuspended=false} error due to Open change stream > failed. > at org.apache.flink.cdc.connectors.base.source.reader. > IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) > at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run( > FetchTask.java:58) > at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher > .runOnce(SplitFetcher.java:165) > ... 6 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Read split > StreamSplit{splitId='stream-split', offset={resumeToken={"_data": > "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, > endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= > false} error due to Open change stream failed. > at org.apache.flink.cdc.connectors.base.source.reader.external. > IncrementalSourceStreamFetcher.checkReadException( > IncrementalSourceStreamFetcher.java:137) > at org.apache.flink.cdc.connectors.base.source.reader.external. > IncrementalSourceStreamFetcher.pollSplitRecords( > IncrementalSourceStreamFetcher.java:115) > at org.apache.flink.cdc.connectors.base.source.reader. > IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader > .java:192) > at org.apache.flink.cdc.connectors.base.source.reader. > IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) > ... 8 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Open change > stream failed > at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. > MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java: > 317) > at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. > MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java: > 248) > at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. > MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104) > at org.apache.flink.cdc.connectors.base.source.reader.external. > IncrementalSourceStreamFetcher.lambda$submitTask$0( > IncrementalSourceStreamFetcher.java:89) > ... 5 more > Caused by: com.mongodb.MongoCommandException: Command failed with error > 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: > caused by :: Resume of change stream was not possible, as the resume point > may no longer be in the oplog.' on server xxxxx.yyyy.mongodb.net:27017. > The full response is {"errorLabels": ["NonResumableChangeStreamError"], > "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by > :: Resume of change stream was not possible, as the resume point may no > longer be in the oplog.", "code": 286, "codeName": > "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": > {"t": 1742888042, "i": 1}}, "signature": {"hash": {"$binary": {"base64": > "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType": "00"}}, "keyId": > 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1742888042, > "i": 1}}} > at com.mongodb.internal.connection.ProtocolHelper > .getCommandFailureException(ProtocolHelper.java:205) > at com.mongodb.internal.connection.InternalStreamConnection > .receiveCommandMessageResponse(InternalStreamConnection.java:443) > at com.mongodb.internal.connection.InternalStreamConnection > .sendAndReceive(InternalStreamConnection.java:365) > at com.mongodb.internal.connection.UsageTrackingInternalConnection > .sendAndReceive(UsageTrackingInternalConnection.java:114) > at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection > .sendAndReceive(DefaultConnectionPool.java:643) > at com.mongodb.internal.connection.CommandProtocolImpl.execute( > CommandProtocolImpl.java:73) > > I think the error is mainly for MongoDB ops log side. Any idea how I can > approach the team handling MongoDB to get them to resolve at their end, so > such errors are not propagated at Flink side. > Anything I can fix at Flink or Flink CDC side to stop application from > continuously restarting due to these errors ? > > Thanks > Sachin > > >