Hi,
So I have a data stream applications which pulls data from MongoDB using
CDC, and after the process runs for few days it fails with following
stacktrace:

com.mongodb.MongoCommandException: Command failed with error 286
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused
by :: Resume of change stream was not possible, as the resume point may no
longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full
response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0,
"errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of
change stream was not possible, as the resume point may no longer be in the
oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost",
"$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}},
"signature": {"hash": {"$binary": {"base64":
"8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId":
7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810,
"i": 30}}}
at
com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
at
com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)
at
com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)
at
com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at
com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
at
com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)
at
com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204)
at
com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122)
at
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87)
at
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76)
at
com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288)
at
com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239)
at
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220)
at
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358)
at
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
at
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357)
at
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
at
com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356)
at
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218)
at
com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
at
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223)
at
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204)
at
com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191)
at
com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
at
com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321)
at
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
at
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
at
com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185)
at
com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
at
com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
at
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
at
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
at
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286)
at
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123)
at
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)

Now I have instantiated my Source as:

MongoDBSource.<T>builder()
    ...
    .startupOptions(StartupOptions.initial())
    .batchSize(2048)
    .connectionOptions("heartbeat.interval.ms=5000")
    .heartbeatIntervalMillis(5000)
    .closeIdleReaders(true)
    .deserializer(...)
    .build();


As mentioned in docs, I have added heartbeat so for infrequently changed
oplogs, it does not fail, however it looks like these settings have no
effect on the failure and still fails when I did not have the heartbeat.

Can anyone tell me what else I could fix here?
Are there any settings missing from the mongodb side?

Thanks
Sachin

Reply via email to