Hi,casel

这段逻辑应该只有在处理到新增表的时候才会用到。
CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。

Best,
Hang


key lou <louke...@gmail.com> 于2023年12月20日周三 16:24写道:

> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>
> casel.chen <casel_c...@126.com> 于2023年12月19日周二 16:45写道:
>
> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
> >
> >
> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
> > snapshot split”这一句话我不理解。
> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> > split再读增量的binlog split么?
> >
> >
> > private MySqlRecords pollSplitRecords() throws InterruptedException {
> >         Iterator<SourceRecords> dataIt;
> >         if (currentReader == null) {
> >             // (1) Reads binlog split firstly and then read snapshot
> split
> >             if (binlogSplits.size() > 0) {
> >                 // the binlog split may come from:
> >                 // (a) the initial binlog split
> >                 // (b) added back binlog-split in newly added table
> process
> >                 MySqlSplit nextSplit = binlogSplits.poll();
> >                 currentSplitId = nextSplit.splitId();
> >                 currentReader = getBinlogSplitReader();
> >                 currentReader.submitSplit(nextSplit);
> >             } else if (snapshotSplits.size() > 0) {
> >                 MySqlSplit nextSplit = snapshotSplits.poll();
> >                 currentSplitId = nextSplit.splitId();
> >                 currentReader = getSnapshotSplitReader();
> >                 currentReader.submitSplit(nextSplit);
> >             } else {
> >                 LOG.info("No available split to read.");
> >             }
> >             dataIt = currentReader.pollSplitRecords();
> >             return dataIt == null ? finishedSplit() : forRecords(dataIt);
> >         } else if (currentReader instanceof SnapshotSplitReader) {
> >           ....
> >         }
> >         ...
> > }
>

回复