Hi, 
目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。


但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。


要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]


[1] 
https://stackoverflow.com/questions/44441153/how-to-stop-a-flink-streaming-job-from-program



--

    Best!
    Xuyang





在 2023-12-06 15:26:56,"刘建" <liuj...@icloudwisdom.com> 写道:
>Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
>我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等

回复