Hi, 需要精准控制异常数据的话,就不太推荐flink sql了。 考虑使用DataStream将异常数据用侧流输出[1],再做补偿。
Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/ On 2023/12/06 08:45:20 Xuyang wrote: > Hi, > 目前flink sql主动收集脏数据的行为。有下面两种可行的办法: > 1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。 > 2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。 > > > 但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。 > > > 要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source > connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1] > > > [1] > https://stackoverflow.com/questions/44441153/how-to-stop-a-flink-streaming-job-from-program > > > > -- > > Best! > Xuyang > > > > > > 在 2023-12-06 15:26:56,"刘建" <li...@icloudwisdom.com> 写道: > >Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, > >我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等 >