Hi,ssmi, no duplicates are produced — exactly-once is guaranteed on the
source side, not downstream.

Before the `BinlogSplit` is suspended to accept newly-added tables, each
existing table's finished snapshot-split `highWatermark` is **forwarded to
the current binlog reading position** `P_current`. Later, when the
`startingOffset` is rewound to the minimum `highWatermark` `P_min` (so the
new table can pick up its snapshot tail), `BinlogSplitReader.shouldEmit()`
filters records per-table: events from existing tables in `(P_min,
P_current]` are dropped because their `highWatermark` is now `P_current`,
while new-table events from `(P_new, ...)` pass through. Offset rewinds
globally, but per-table watermarks prevent re-emission.

*Key Code*

   - `BinlogSplitReader.java
   
<https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L321>`
   `shouldEmit()` — per-record, per-table filter against `highWatermark`.
   - `MySqlBinlogSplit.java
   
<https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java#L337>`
   `forwardHighWatermarkToStartingOffset()` — the dedup primitive; rewrites
   old splits' `highWatermark` to `P_current` at suspend time.
   - `MySqlBinlogSplit.java
   
<https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java#L209>`
   `appendFinishedSplitInfos()` — rewinds `startingOffset` to `min(old, new
   highWatermarks)` after new-table snapshot completes.


If you have any further questions, feel free to discuss.

Best,
Yanquan


lec ssmi <[email protected]> 于2026年6月9日周二 22:07写道:

> Hi Flink CDC community,
>
> I have a question regarding incremental snapshot behavior when restoring a
> Flink CDC job from checkpoint.
>
> Suppose a CDC job is restored from a checkpoint and, at the same time, new
> tables are added into the capture list (whitelist).
>
> As I understand the current implementation:
>
>    1.
>
>    Existing tables continue consuming binlog.
>    2.
>
>    Newly added tables start incremental snapshot.
>    3.
>
>    After all snapshot splits finish, the BinlogSplit obtains all finished
>    snapshot split infos from SplitEnumerator.
>    4.
>
>    The new binlog split start offset is adjusted to the minimum watermark
>    among all completed snapshot splits.
>
> My confusion is about possible duplicate consumption.
>
> During the period when BinlogSplit runs independently (before switching to
> the adjusted start offset), existing tables may already consume and emit
> some binlog events.
>
> Later, after the binlog start offset moves backward to the minimum
> snapshot watermark, those previously consumed events appear to become
> readable again.
>
> From my reading, shouldEmit() seems to compare the current binlog
> position with snapshot offsets to decide whether to emit records.
>
> However, I could not fully understand how duplication is avoided for
> existing tables whose records were already emitted during that standalone
> binlog reading period.
>
> Specifically:
>
>    -
>
>    Is there additional state tracking to prevent re-emitting events
>    already processed before offset adjustment?
>    -
>
>    Or is duplication possible in this scenario and expected to be handled
>    downstream?
>    -
>
>    Which component guarantees exactly-once semantics here?
>
> I may have misunderstood some part of the snapshot/binlog coordination
> logic, so any clarification would be greatly appreciated.
>
> Thanks!
>

Reply via email to