Re:Re: Flink CDC MySqlSplitReader问题
那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么? 在 2023-12-20 21:40:05,"Hang Ruan" 写道: >Hi,casel > >这段逻辑应该只有在处理到新增表的时候才会用到。 >CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。 >但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。 > >Best, >Hang > > >key lou 于2023年12月20日周三 16:24写道: > >> 意思是当 有 binlog 就意味着 已经读完了 snapshot >> >> casel.chen 于2023年12月19日周二 16:45写道: >> >> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢! >> > >> > >> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read >> > snapshot split”这一句话我不理解。 >> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot >> > split再读增量的binlog split么? >> > >> > >> > private MySqlRecords pollSplitRecords() throws InterruptedException { >> > Iterator dataIt; >> > if (currentReader == null) { >> > // (1) Reads binlog split firstly and then read snapshot >> split >> > if (binlogSplits.size() > 0) { >> > // the binlog split may come from: >> > // (a) the initial binlog split >> > // (b) added back binlog-split in newly added table >> process >> > MySqlSplit nextSplit = binlogSplits.poll(); >> > currentSplitId = nextSplit.splitId(); >> > currentReader = getBinlogSplitReader(); >> > currentReader.submitSplit(nextSplit); >> > } else if (snapshotSplits.size() > 0) { >> > MySqlSplit nextSplit = snapshotSplits.poll(); >> > currentSplitId = nextSplit.splitId(); >> > currentReader = getSnapshotSplitReader(); >> > currentReader.submitSplit(nextSplit); >> > } else { >> > LOG.info("No available split to read."); >> > } >> > dataIt = currentReader.pollSplitRecords(); >> > return dataIt == null ? finishedSplit() : forRecords(dataIt); >> > } else if (currentReader instanceof SnapshotSplitReader) { >> > >> > } >> > ... >> > } >>
Re: 退订
你好, 请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件 Best, Junrui jimandlice 于2023年12月21日周四 17:43写道: > 退订 > | | > jimandlice > | > | > 邮箱:jimandl...@163.com > |
RE: Re:Flink脏数据处理
Hi, 需要精准控制异常数据的话,就不太推荐flink sql了。 考虑使用DataStream将异常数据用侧流输出[1],再做补偿。 Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/ On 2023/12/06 08:45:20 Xuyang wrote: > Hi, > 目前flink sql主动收集脏数据的行为。有下面两种可行的办法: > 1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。 > 2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。 > > > 但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。 > > > 要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source > connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1] > > > [1] > https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program > > > > -- > > Best! > Xuyang > > > > > > 在 2023-12-06 15:26:56,"刘建" 写道: > >Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, > >我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等 >
RE: Re:Re:flink sql支持批量lookup join
Hi, casel. 使用三次lookup join是可以实现的,加上缓存,性能应该不差。 WITH users AS ( SELECT * FROM (VALUES(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu')) T (id, name) ) SELECT orders.id, u1.name as creator_name, u2.name as approver_name, u3.name as deployer_name FROM ( SELECT * FROM (VALUES(1, 1, 2, 3)) T (id, creator_id, approver_id, deployer_id) ) AS orders LEFT JOIN users AS u1 ON orders.creator_id = u1.id LEFT JOIN users AS u2 ON orders.approver_id = u2.id LEFT JOIN users AS u3 ON orders.deployer_id = u3.id; Best, Jiabao On 2023/11/22 12:44:47 "casel.chen" wrote: > 有一张维表 user,包含id和name字段 > id | name > - > 1 | zhangsan > 2 | lisi > 3 | wangwu > > > 现在实时来了一条交易数据 > id | creator_id | approver_id | deployer_id > - > 1 | 1| 2 | 3 > > > 希望lookup维表user返回各用户名称 > id | creator_name | approver_name | deployer_name > > 1| zhangsan | lisi|. wangwu > > > > 以上场景用flink sql要如何实现? > > > > > > > > > > > > > > > 在 2023-11-22 12:37:10,"Xuyang" 写道: > >Hi, casel. > >可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and > >k3=v3的用法的。 > > > > > > > > > >-- > > > >Best! > >Xuyang > > > > > > > > > >在 2023-11-22 11:55:11,"casel.chen" 写道: > >>一行数据带了三个待lookup查询的key,分别是key1,key2和key3 > >> > >> > >>id key1 key2 key3 > >>想实现批量lookup查询返回一行数据 id value1 value2 value3 > >> > >> > >>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示 > >>id key1 key2 key3 > >>先将多列转成多行 > >>id key1 > >>id key2 > >>id key3 > >> > >>分别进行lookup join后得到 > >>id value1 > >>id value2 > >>id value3 > >>最后多行转多列返回一行数据 > >> > >>id value1 value2 value3 > >> > >> > >>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么? >
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |
退订
退订 | | jimandlice | | 邮箱:jimandl...@163.com |