Hi,
I have raised a JIRA for this:
https://issues.apache.org/jira/browse/FLINK-37909

And also a PR which I feel should fix this issue:
https://github.com/apache/flink-cdc/pull/4039

Can someone from the Flink community take an initiative for this and get
this fixed.

Thanks
Sachin


On Thu, Jun 5, 2025 at 9:18 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I seem to have some difficulty in understanding the code for:
>
>
> https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java#L95
>
> Outside the while loop we initialize
>         MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
>                 openChangeStreamCursor(descriptor);
>         HeartbeatManager heartbeatManager =
> openHeartbeatManagerIfNeeded(changeStreamCursor);
>
> However inside the while (taskRunning) loop we reassign
> changeStreamCursor = openChangeStreamCursor(descriptor,
> resumeTokenExpires);
>
> So it looks like heartbeatManager still runs using the older
> changeStreamCursor.
>
> As a result we frequently get the following error, after running the
> datastream job for few days:
>
> com.mongodb.MongoCommandException: Command failed with error 286
> (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused
> by :: Resume of change stream was not possible, as the resume point may no
> longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full
> response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0,
> "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of
> change stream was not possible, as the resume point may no longer be in the
> oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost",
> "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}},
> "signature": {"hash": {"$binary": {"base64":
> "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId":
> 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810,
> "i": 30}}}
> at
> com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
> at
> com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)
> at
> com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)
> at
> com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
> at
> com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
> at
> com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)
> at
> com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204)
> at
> com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122)
> at
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87)
> at
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76)
> at
> com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288)
> at
> com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239)
> at
> com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220)
> at
> com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358)
> at
> com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
> at
> com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357)
> at
> com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
> at
> com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356)
> at
> com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218)
> at
> com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
> at
> com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223)
> at
> com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204)
> at
> com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
> at
> com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
> at
> com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
> at
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286)
> at
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123)
>
> Based on raising this issue with MongoDB support they have reported:
>
> Following the log review, I have two concerns related to the resumeToken:
>
>    1. The workload using resumeAfter appears to be prone to having it's
>    resumeToken be "rewound" to an older timestamp that the workload appears to
>    already have processed
>    2. The workload using startAfter doesn't appear to reliably
>    save/update its progress with updated checkpoints at all
>
> In both of these cases, it seems you may have some issues in the way the
> application is storing the resumeToken:
>
>    1. Either not consistently updated or
>    2. Prone to being overwritten with an older change
>
> In both cases, this leads to the token eventually falling off the oplog
> due to trying to resume from a timestamp that no longer exists. In the case
> of #1, your application logic may not be consistently updating resumeTokens
> with each iteration of the change stream cursor. In the case of #2, it may
> be that a competing application thread wrote an older timestamp.
> Please let me know if my findings and what is reported by MongoDB
> support are somewhat related. What it looks like is that heartbeatManager
> runs on an older changeStreamCursor which is kind of not updating resume
> tokes or getting rewounded.
>
> Thanks
> Sachin
>
>
>

Reply via email to