Hello Folks,
I have tried to apply the solution you have suggested. I have declared
mongodb CDC source as:

MongoDBSource.<T>builder()
    .hosts(hostname)
    .scheme(SCHEME)
    .databaseList(dbName)
    .collectionList(dbName + "." + collectionName)
    .username(username)
    .password(password)
    .startupOptions(StartupOptions.initial())
    .batchSize(2048)
    .connectionOptions("heartbeat.interval.ms=5000")
    .closeIdleReaders(true)
    .deserializer(...)


However after around a week of starting this stream I again get the
following error again:

com.mongodb.MongoCommandException: Command failed with error 286
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation ::
caused by :: Resume of change stream was not possible, as the resume
point may no longer be in the oplog.' on server
xxxxx.yyy.mongodb.net:27017. The full response is {"errorLabels":
["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor
error during aggregation :: caused by :: Resume of change stream was
not possible, as the resume point may no longer be in the oplog.",
"code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime":
{"clusterTime": {"$timestamp": {"t": 1746774243, "i": 1}},
"signature": {"hash": {"$binary": {"base64":
"wdywOjmcLPIkhRxA0PsD2JHXzxs=", "subType": "00"}}, "keyId":
7448674344508588123}}, "operationTime": {"$timestamp": {"t":
1746774243, "i": 1}}}

        at 
com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)

        at 
com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)

        at 
com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)

        at 
com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)

        at 
com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)

        at 
com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)

        at 
com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204)

        at 
com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122)

        at 
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87)

        at 
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76)

        at 
com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288)

        at 
com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239)

        at 
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220)

        at 
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358)

        at 
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)

        at 
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357)

        at 
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)

        at 
com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356)

        at 
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218)

        at 
com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)

        at 
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223)

        at 
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204)

        at 
com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191)

        at 
com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)

        at 
com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321)

        at 
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)

        at 
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)

        at 
com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185)

        at 
com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)

        at 
com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)

        at 
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)

        at 
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)

        at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286)

        at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:248)

        at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)

        at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)

        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

        at java.base/java.lang.Thread.run(Thread.java:829)


Is there something I am still missing ?

How can I get this issue fixed ?

Thanks
Sachin


On Tue, Mar 25, 2025 at 2:20 PM Xiqian YU <kono....@outlook.com> wrote:

> Hi Sachin,
>
> Seems MongoDB CDC is trying to restore from a previous change stream
> position, which has been removed from oplog collection. The resumeToken
> data provided (8267DD842E000000A22B0229296E04) was created at Friday, March
> 21, 2025 3:22:22 PM (4 days ago), which may have exceeded the MongoDB
> server oplog's TTL.
>
> For the CDC client side, there’s a “heartbeat.interval.ms” [1] option to
> send heartbeat requests to MongoDB server regularly and refreshes resume
> token position. It is suggested to set it to a reasonable interval if
> captured collection doesn’t have much change logs produced.
>
> [1]
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/connectors/flink-sources/mongodb-cdc/#connector-options
>
> Best Regards,
> Xiqian
>
> 2025年3月25日 15:42,Sachin Mittal <sjmit...@gmail.com> 写道:
>
> Hi,
> We are using Flink CDC as our datastream application deployed on AWS KDA.
> Our MongoDB is deployed on Mongo Atlas.
>
> The versions are:
> Flink : 1.20.0
> MongoCDC (flink-connector-mongodb-cdc) :  3.1.1
>
> After the application is running for few days, I get the following error:
>
> java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at org.apache.flink.connector.base.source.reader.fetcher.
> SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .getNextFetch(SourceReaderBase.java:228)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .pollNext(SourceReaderBase.java:190)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:444)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
> StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:638)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:973)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:917)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:972)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 951)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
> .runOnce(SplitFetcher.java:168)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(
> SplitFetcher.java:117)
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
> .java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: java.io.IOException: org.apache.flink.util.
> FlinkRuntimeException: Read split StreamSplit{splitId='stream-split',
> offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"},
> timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp=
> 9223372034707292159}, isSuspended=false} error due to Open change stream
> failed.
> at org.apache.flink.cdc.connectors.base.source.reader.
> IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(
> FetchTask.java:58)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
> .runOnce(SplitFetcher.java:165)
> ... 6 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
> StreamSplit{splitId='stream-split', offset={resumeToken={"_data":
> "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false} error due to Open change stream failed.
> at org.apache.flink.cdc.connectors.base.source.reader.external.
> IncrementalSourceStreamFetcher.checkReadException(
> IncrementalSourceStreamFetcher.java:137)
> at org.apache.flink.cdc.connectors.base.source.reader.external.
> IncrementalSourceStreamFetcher.pollSplitRecords(
> IncrementalSourceStreamFetcher.java:115)
> at org.apache.flink.cdc.connectors.base.source.reader.
> IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader
> .java:192)
> at org.apache.flink.cdc.connectors.base.source.reader.
> IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> ... 8 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Open change
> stream failed
> at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:
> 317)
> at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:
> 248)
> at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
> at org.apache.flink.cdc.connectors.base.source.reader.external.
> IncrementalSourceStreamFetcher.lambda$submitTask$0(
> IncrementalSourceStreamFetcher.java:89)
> ... 5 more
> Caused by: com.mongodb.MongoCommandException: Command failed with error
> 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation ::
> caused by :: Resume of change stream was not possible, as the resume point
> may no longer be in the oplog.' on server xxxxx.yyyy.mongodb.net:27017.
> The full response is {"errorLabels": ["NonResumableChangeStreamError"],
> "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by
> :: Resume of change stream was not possible, as the resume point may no
> longer be in the oplog.", "code": 286, "codeName":
> "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp":
> {"t": 1742888042, "i": 1}}, "signature": {"hash": {"$binary": {"base64":
> "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType": "00"}}, "keyId":
> 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1742888042,
> "i": 1}}}
> at com.mongodb.internal.connection.ProtocolHelper
> .getCommandFailureException(ProtocolHelper.java:205)
> at com.mongodb.internal.connection.InternalStreamConnection
> .receiveCommandMessageResponse(InternalStreamConnection.java:443)
> at com.mongodb.internal.connection.InternalStreamConnection
> .sendAndReceive(InternalStreamConnection.java:365)
> at com.mongodb.internal.connection.UsageTrackingInternalConnection
> .sendAndReceive(UsageTrackingInternalConnection.java:114)
> at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection
> .sendAndReceive(DefaultConnectionPool.java:643)
> at com.mongodb.internal.connection.CommandProtocolImpl.execute(
> CommandProtocolImpl.java:73)
>
> I think the error is mainly for MongoDB ops log side. Any idea how I can
> approach the team handling MongoDB to get them to resolve at their end, so
> such errors are not propagated at Flink side.
> Anything I can fix at Flink or Flink CDC side to stop application from
> continuously restarting due to these errors ?
>
> Thanks
> Sachin
>
>
>

Reply via email to