场景:mysql数据实时同步到mongodb.  上游mysql binlog日志发到一个kafka topic, 
不保证同一个主键的记录发到相同的partition,为了保证下游sink 
mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?


create table person_source (
  id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'kafka',
   ......
   'format' = 'canal-json'
);


create view person_view as 
select id, ??? from person_source group by id;


create table person_sink (
   id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'mongodb',
   ......
   'format' = 'json'
);


insert into person_sink select * from person_view;

回复