那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?
在 2023-12-20 21:40:05,"Hang Ruan" <ruanhang1...@gmail.com> 写道: >Hi,casel > >这段逻辑应该只有在处理到新增表的时候才会用到。 >CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。 >但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。 > >Best, >Hang > > >key lou <louke...@gmail.com> 于2023年12月20日周三 16:24写道: > >> 意思是当 有 binlog 就意味着 已经读完了 snapshot >> >> casel.chen <casel_c...@126.com> 于2023年12月19日周二 16:45写道: >> >> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢! >> > >> > >> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read >> > snapshot split”这一句话我不理解。 >> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot >> > split再读增量的binlog split么? >> > >> > >> > private MySqlRecords pollSplitRecords() throws InterruptedException { >> > Iterator<SourceRecords> dataIt; >> > if (currentReader == null) { >> > // (1) Reads binlog split firstly and then read snapshot >> split >> > if (binlogSplits.size() > 0) { >> > // the binlog split may come from: >> > // (a) the initial binlog split >> > // (b) added back binlog-split in newly added table >> process >> > MySqlSplit nextSplit = binlogSplits.poll(); >> > currentSplitId = nextSplit.splitId(); >> > currentReader = getBinlogSplitReader(); >> > currentReader.submitSplit(nextSplit); >> > } else if (snapshotSplits.size() > 0) { >> > MySqlSplit nextSplit = snapshotSplits.poll(); >> > currentSplitId = nextSplit.splitId(); >> > currentReader = getSnapshotSplitReader(); >> > currentReader.submitSplit(nextSplit); >> > } else { >> > LOG.info("No available split to read."); >> > } >> > dataIt = currentReader.pollSplitRecords(); >> > return dataIt == null ? finishedSplit() : forRecords(dataIt); >> > } else if (currentReader instanceof SnapshotSplitReader) { >> > .... >> > } >> > ... >> > } >>