Flink SQL作业示意如下:
create table user_source_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING, dept_id BIGINT NOT NULL, proctime AS PROCTIME() ) with ( 'connector' = 'kafka', 'format' = 'canal-json', ... ); create table department_dim_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING ) with ( 'connector' = 'jdbc', ... ); create table user_rich_sink_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING, dept_name STRING ) with ( 'connector' = 'jdbc' ... ); insert into user_rich_sink_table select id, name, d.name as dept_name from user_source_table u left join department_dim_table for system_time as of u.proctime as d on u.dept_id = d.id; 用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert? 现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。 在 2023-02-20 08:41:20,"Shammon FY" <zjur...@gmail.com> 写道: >Hi > >如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 > >Best, >Shammon > > >On Sun, Feb 19, 2023 at 1:43 PM RS <tinyshr...@163.com> wrote: > >> Hi, >> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 >> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into >> >> >> Thanks >> >> >> >> 在 2023-02-17 15:56:51,"casel.chen" <casel_c...@126.com> 写道: >> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner >> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 >> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink >> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 >> > >> > >> >请问: >> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? >> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? >> >我理解flink >> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 >> > >>