我再仔细看了下你的问题,你的 join key 是 status id,所以目前会按照 status id 做 shuffle key 分发给 join 的不同并发处理。 如果 test 表的 status id 发生变更的话,就会导致一个 test id 的数据会被不同的 join 并发处理,也即 test 数据已经乱序了, 这时候,即使下游再加 keyby sink key,也无济于事了。
所以,如果双流 join 两个 cdc 流,要注意 join key 是不能发生变更的,否则只能 join 设置成单并发。 像你这个场景,可以考虑采用维表 join status 表,因为目前维表 join 不会按照 join key 做 shuffle,所以能保证即使 test 表数据不乱序。 但是 status 表的更新,就无法触发计算 更新到sink 表了,只有 test 表的更新 才会触发计算并更新到 sink 表。 Best, Jark On Mon, 16 Nov 2020 at 16:03, jindy_liu <[email protected]> wrote: > 1、试了下 > > 在test表中增加一个proctime > > CREATE TABLE test ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > `proctime` AS PROCTIME(), > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'no_lock', > 'password' = 'no_lock', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'test', > 'debezium.snapshot.locking.mode' = 'none' > ); > > 写去重语句, > > INSERT into test_status_print > SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name > FROM ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as > rowNum > FROM ( > SELECT t.* , s.name as status_name > FROM test AS t > LEFT JOIN status AS s ON t.status = s.id > ) > )r WHERE rowNum = 1; > > 但提示报错,不支持: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Deduplicate doesn't support > consuming update and delete changes which is produced by node > Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name, > time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
