如果你是改了test表上的 status 关联字段,那么是会出现这个现象的。你一开始的 example 不是改 status 字段的。

这个问题的本质是 join key 和你最终的 sink key 不一致,导致可能出现乱序。
这个只需要在 sink 前显式按照 sink key shuffle 应该就能解决,比如加上一个 deduplicate by sink key 节点。
或者在 1.12 版本中,只需要 sink 并发与前面节点的并发不一样,框架也会自动加上一个 sink key shuffle。

关于你说的 join 节点热点问题,那是因为你的 status key 太少了,导致数据倾斜严重。





On Mon, 16 Nov 2020 at 12:03, jindy_liu <[email protected]> wrote:

> 怕图片看不清,
> 我文字补充下:
> 1、print的最后几行。
>
> 32> +I(1999991,jindy1999991,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999992,jindy1999992,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999993,jindy1999993,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999994,jindy1999994,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999995,jindy1999995,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999996,jindy1999996,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999997,jindy1999997,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999998,jindy1999998,2020-07-03T18:04:22,0,statu0)
> 32> +I(1999999,jindy1999999,2020-07-03T18:04:22,0,statu0)
> 36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
> 32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
> 30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
> 36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
> 36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
> 30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)
>
> 2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复