场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic, 不保证同一个主键的记录发到相同的partition,为了保证下游sink mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。 问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?
create table person_source ( id BIGINT PRIMARY KEY NOT FORCED, name STRING, age BIGINT ) with ( 'connector' = 'kafka', ...... 'format' = 'canal-json' ); create view person_view as select id, ??? from person_source group by id; create table person_sink ( id BIGINT PRIMARY KEY NOT FORCED, name STRING, age BIGINT ) with ( 'connector' = 'mongodb', ...... 'format' = 'json' ); insert into person_sink select * from person_view;
