Re: Re: Flink CDC MySqlSplitReader问题

2023-12-24 文章 Hang Ruan
Hi,

我记得这段逻辑是为了保证在新增表后,binlog 读取能和新增表的快照读取一起进行,保证binlog读取不会中断。
这里应该是会先读binlog,然后再读snapshot,再是binlog。这样的切换,来保证binlog 能一直有数据读出来。

Best,
Hang

casel.chen  于2023年12月22日周五 10:44写道:

> 那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-12-20 21:40:05,"Hang Ruan"  写道:
> >Hi,casel
> >
> >这段逻辑应该只有在处理到新增表的时候才会用到。
> >CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
> >但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。
> >
> >Best,
> >Hang
> >
> >
> >key lou  于2023年12月20日周三 16:24写道:
> >
> >> 意思是当 有 binlog  就意味着 已经读完了 snapshot
> >>
> >> casel.chen  于2023年12月19日周二 16:45写道:
> >>
> >> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
> >> >
> >> >
> >> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then
> read
> >> > snapshot split”这一句话我不理解。
> >> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> >> > split再读增量的binlog split么?
> >> >
> >> >
> >> > private MySqlRecords pollSplitRecords() throws InterruptedException {
> >> > Iterator dataIt;
> >> > if (currentReader == null) {
> >> > // (1) Reads binlog split firstly and then read snapshot
> >> split
> >> > if (binlogSplits.size() > 0) {
> >> > // the binlog split may come from:
> >> > // (a) the initial binlog split
> >> > // (b) added back binlog-split in newly added table
> >> process
> >> > MySqlSplit nextSplit = binlogSplits.poll();
> >> > currentSplitId = nextSplit.splitId();
> >> > currentReader = getBinlogSplitReader();
> >> > currentReader.submitSplit(nextSplit);
> >> > } else if (snapshotSplits.size() > 0) {
> >> > MySqlSplit nextSplit = snapshotSplits.poll();
> >> > currentSplitId = nextSplit.splitId();
> >> > currentReader = getSnapshotSplitReader();
> >> > currentReader.submitSplit(nextSplit);
> >> > } else {
> >> > LOG.info("No available split to read.");
> >> > }
> >> > dataIt = currentReader.pollSplitRecords();
> >> > return dataIt == null ? finishedSplit() :
> forRecords(dataIt);
> >> > } else if (currentReader instanceof SnapshotSplitReader) {
> >> >   
> >> > }
> >> > ...
> >> > }
> >>
>


Re:Re: Flink CDC MySqlSplitReader问题

2023-12-21 文章 casel.chen
那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?

















在 2023-12-20 21:40:05,"Hang Ruan"  写道:
>Hi,casel
>
>这段逻辑应该只有在处理到新增表的时候才会用到。
>CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
>但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。
>
>Best,
>Hang
>
>
>key lou  于2023年12月20日周三 16:24写道:
>
>> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>>
>> casel.chen  于2023年12月19日周二 16:45写道:
>>
>> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
>> >
>> >
>> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
>> > snapshot split”这一句话我不理解。
>> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
>> > split再读增量的binlog split么?
>> >
>> >
>> > private MySqlRecords pollSplitRecords() throws InterruptedException {
>> > Iterator dataIt;
>> > if (currentReader == null) {
>> > // (1) Reads binlog split firstly and then read snapshot
>> split
>> > if (binlogSplits.size() > 0) {
>> > // the binlog split may come from:
>> > // (a) the initial binlog split
>> > // (b) added back binlog-split in newly added table
>> process
>> > MySqlSplit nextSplit = binlogSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getBinlogSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else if (snapshotSplits.size() > 0) {
>> > MySqlSplit nextSplit = snapshotSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getSnapshotSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else {
>> > LOG.info("No available split to read.");
>> > }
>> > dataIt = currentReader.pollSplitRecords();
>> > return dataIt == null ? finishedSplit() : forRecords(dataIt);
>> > } else if (currentReader instanceof SnapshotSplitReader) {
>> >   
>> > }
>> > ...
>> > }
>>


Re: Flink CDC MySqlSplitReader问题

2023-12-20 文章 Hang Ruan
Hi,casel

这段逻辑应该只有在处理到新增表的时候才会用到。
CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。

Best,
Hang


key lou  于2023年12月20日周三 16:24写道:

> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>
> casel.chen  于2023年12月19日周二 16:45写道:
>
> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
> >
> >
> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
> > snapshot split”这一句话我不理解。
> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> > split再读增量的binlog split么?
> >
> >
> > private MySqlRecords pollSplitRecords() throws InterruptedException {
> > Iterator dataIt;
> > if (currentReader == null) {
> > // (1) Reads binlog split firstly and then read snapshot
> split
> > if (binlogSplits.size() > 0) {
> > // the binlog split may come from:
> > // (a) the initial binlog split
> > // (b) added back binlog-split in newly added table
> process
> > MySqlSplit nextSplit = binlogSplits.poll();
> > currentSplitId = nextSplit.splitId();
> > currentReader = getBinlogSplitReader();
> > currentReader.submitSplit(nextSplit);
> > } else if (snapshotSplits.size() > 0) {
> > MySqlSplit nextSplit = snapshotSplits.poll();
> > currentSplitId = nextSplit.splitId();
> > currentReader = getSnapshotSplitReader();
> > currentReader.submitSplit(nextSplit);
> > } else {
> > LOG.info("No available split to read.");
> > }
> > dataIt = currentReader.pollSplitRecords();
> > return dataIt == null ? finishedSplit() : forRecords(dataIt);
> > } else if (currentReader instanceof SnapshotSplitReader) {
> >   
> > }
> > ...
> > }
>


Re: Flink CDC MySqlSplitReader问题

2023-12-20 文章 key lou
意思是当 有 binlog  就意味着 已经读完了 snapshot

casel.chen  于2023年12月19日周二 16:45写道:

> 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
>
>
> MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
> snapshot split”这一句话我不理解。
> 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
> split再读增量的binlog split么?
>
>
> private MySqlRecords pollSplitRecords() throws InterruptedException {
> Iterator dataIt;
> if (currentReader == null) {
> // (1) Reads binlog split firstly and then read snapshot split
> if (binlogSplits.size() > 0) {
> // the binlog split may come from:
> // (a) the initial binlog split
> // (b) added back binlog-split in newly added table process
> MySqlSplit nextSplit = binlogSplits.poll();
> currentSplitId = nextSplit.splitId();
> currentReader = getBinlogSplitReader();
> currentReader.submitSplit(nextSplit);
> } else if (snapshotSplits.size() > 0) {
> MySqlSplit nextSplit = snapshotSplits.poll();
> currentSplitId = nextSplit.splitId();
> currentReader = getSnapshotSplitReader();
> currentReader.submitSplit(nextSplit);
> } else {
> LOG.info("No available split to read.");
> }
> dataIt = currentReader.pollSplitRecords();
> return dataIt == null ? finishedSplit() : forRecords(dataIt);
> } else if (currentReader instanceof SnapshotSplitReader) {
>   
> }
> ...
> }


Flink CDC MySqlSplitReader问题

2023-12-19 文章 casel.chen
我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!


MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read 
snapshot split”这一句话我不理解。
为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot split再读增量的binlog 
split么?


private MySqlRecords pollSplitRecords() throws InterruptedException {
Iterator dataIt;
if (currentReader == null) {
// (1) Reads binlog split firstly and then read snapshot split
if (binlogSplits.size() > 0) {
// the binlog split may come from:
// (a) the initial binlog split
// (b) added back binlog-split in newly added table process
MySqlSplit nextSplit = binlogSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getBinlogSplitReader();
currentReader.submitSplit(nextSplit);
} else if (snapshotSplits.size() > 0) {
MySqlSplit nextSplit = snapshotSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
} else {
LOG.info("No available split to read.");
}
dataIt = currentReader.pollSplitRecords();
return dataIt == null ? finishedSplit() : forRecords(dataIt);
} else if (currentReader instanceof SnapshotSplitReader) {
  
}
...
}