请问 flink sql cdc 场景下如何增大下游sink端并行度? 我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。 而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度 初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。
以下是作业内容: -- source CREATE TABLE mysql_old_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'flink-test', 'table-name' = 'old_order' ); -- sink CREATE TABLE kafka_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'order', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); -- insert INSERT INTO kafka_order_table SELECT * FROM mysql_old_order_table; -- source CREATE TABLE kafka_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'order', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); -- sink CREATE TABLE mysql_order_table ( order_number BIGINT, price DECIMAL, order_time TIMESTAMP(3), PRIMARY KEY (order_number) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/flink-test', 'table-name' = 'order', 'username' = 'root', 'password' = 'root', 'sink.buffer-flush.max-rows' = '3', 'sink.buffer-flush.interval' = '1s' ); -- insert INSERT INTO mysql_order_table SELECT * FROM kafka_order_table; 在 2021-06-08 19:49:40,"Leonard Xu" <xbjt...@gmail.com> 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc >> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? >是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc >connector支持多并发读取,下游sink自然就能解决。 > > >> flink 1.13的jdbc connector新增 sink.parallism >> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? > >这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 >sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, >否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 >https://issues.apache.org/jira/browse/FLINK-20374 ><https://issues.apache.org/jira/browse/FLINK-20374> >https://issues.apache.org/jira/browse/FLINK-22901 ><https://issues.apache.org/jira/browse/FLINK-22901> > >祝好, >Leonard