Re:Re: Flink CDC MySqlSplitReader问题

2023-12-21 文章 casel.chen
那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?

















在 2023-12-20 21:40:05,"Hang Ruan"  写道:
>Hi,casel
>
>这段逻辑应该只有在处理到新增表的时候才会用到。
>CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
>但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。
>
>Best,
>Hang
>
>
>key lou  于2023年12月20日周三 16:24写道:
>
>> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>>
>> casel.chen  于2023年12月19日周二 16:45写道:
>>
>> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
>> >
>> >
>> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
>> > snapshot split”这一句话我不理解。
>> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
>> > split再读增量的binlog split么?
>> >
>> >
>> > private MySqlRecords pollSplitRecords() throws InterruptedException {
>> > Iterator dataIt;
>> > if (currentReader == null) {
>> > // (1) Reads binlog split firstly and then read snapshot
>> split
>> > if (binlogSplits.size() > 0) {
>> > // the binlog split may come from:
>> > // (a) the initial binlog split
>> > // (b) added back binlog-split in newly added table
>> process
>> > MySqlSplit nextSplit = binlogSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getBinlogSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else if (snapshotSplits.size() > 0) {
>> > MySqlSplit nextSplit = snapshotSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getSnapshotSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else {
>> > LOG.info("No available split to read.");
>> > }
>> > dataIt = currentReader.pollSplitRecords();
>> > return dataIt == null ? finishedSplit() : forRecords(dataIt);
>> > } else if (currentReader instanceof SnapshotSplitReader) {
>> >   
>> > }
>> > ...
>> > }
>>


Re: 退订

2023-12-21 文章 Junrui Lee
你好,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件

Best,
Junrui

jimandlice  于2023年12月21日周四 17:43写道:

> 退订
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |


RE: Re:Flink脏数据处理

2023-12-21 文章 Jiabao Sun
Hi,

需要精准控制异常数据的话,就不太推荐flink sql了。
考虑使用DataStream将异常数据用侧流输出[1],再做补偿。

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/


On 2023/12/06 08:45:20 Xuyang wrote:
> Hi, 
> 目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
> 1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
> 2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。
> 
> 
> 但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。
> 
> 
> 要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
> connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]
> 
> 
> [1] 
> https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program
> 
> 
> 
> --
> 
> Best!
> Xuyang
> 
> 
> 
> 
> 
> 在 2023-12-06 15:26:56,"刘建"  写道:
> >Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
> >我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等
> 

RE: Re:Re:flink sql支持批量lookup join

2023-12-21 文章 Jiabao Sun
Hi, casel.

使用三次lookup join是可以实现的,加上缓存,性能应该不差。

WITH users AS (
SELECT *
  FROM (VALUES(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu')) T (id, name)
)
SELECT orders.id, 
   u1.name as creator_name,
   u2.name as approver_name,
   u3.name as deployer_name
FROM (
   SELECT *
  FROM (VALUES(1, 1, 2, 3)) T (id, creator_id, approver_id, deployer_id)
) AS orders
LEFT JOIN users AS u1 ON orders.creator_id = u1.id
LEFT JOIN users AS u2 ON orders.approver_id = u2.id
LEFT JOIN users AS u3 ON orders.deployer_id = u3.id;

Best,
Jiabao

On 2023/11/22 12:44:47 "casel.chen" wrote:
> 有一张维表 user,包含id和name字段
> id  | name
> -
> 1 | zhangsan
> 2 | lisi
> 3 | wangwu
> 
> 
> 现在实时来了一条交易数据 
> id  | creator_id  | approver_id  | deployer_id
> -
> 1   | 1| 2   | 3
> 
> 
> 希望lookup维表user返回各用户名称
> id   |  creator_name   |  approver_name  |  deployer_name
> 
> 1| zhangsan  |  lisi|. wangwu
> 
> 
> 
> 以上场景用flink sql要如何实现?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-11-22 12:37:10,"Xuyang"  写道:
> >Hi, casel.
> >可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
> >k3=v3的用法的。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2023-11-22 11:55:11,"casel.chen"  写道:
> >>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
> >>
> >>
> >>id key1 key2 key3
> >>想实现批量lookup查询返回一行数据 id value1 value2 value3
> >>
> >>
> >>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
> >>id key1 key2 key3
> >>先将多列转成多行
> >>id key1
> >>id key2
> >>id key3
> >>
> >>分别进行lookup join后得到
> >>id value1
> >>id value2
> >>id value3
> >>最后多行转多列返回一行数据
> >>
> >>id value1 value2 value3
> >>
> >>
> >>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?
> 

退订

2023-12-21 文章 jimandlice
退订
| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

退订

2023-12-21 文章 jimandlice
退订
| |
jimandlice
|
|
邮箱:jimandl...@163.com
|