我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下:
String sourceDdl =" CREATE TABLE debezium_source " +
"( " +
"id STRING NOT NULL, name STRING, description STRING, weight
Double" +
") " +
"WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = 'test0717'," +
" 'properties.bootstrap.servers' = ' 172.22.20.206:9092', "
+
"'scan.startup.mode' =
'group-offsets','properties.group.id'='test'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include'='false'," +
"'debezium-json.ignore-parse-errors'='true')";
tEnv.executeSql(sourceDdl);
System.out.println("init source ddl successful ==>" + sourceDdl);
String sinkDdl = " CREATE TABLE sink " +
"( " +
"id STRING NOT NULL," +
" name STRING, " +
"description STRING," +
" weight Double," +
" PRIMARY KEY (id) NOT ENFORCED " +
")" +
" WITH " +
"( " +
"'connector' = 'jdbc', " +
"'url' =
'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
"'table-name' = 'table-out', " +
"'driver'= 'com.mysql.cj.jdbc.Driver'," +
"'sink.buffer-flush.interval'='1s'," +
"'sink.buffer-flush.max-rows'='1000'," +
"'username'='DataPip', " +
"'password'='DataPip')";
tEnv.executeSql(sinkDdl);
System.out.println("init sink ddl successful ==>" + sinkDdl);
String dml = "INSERT INTO sink SELECT id,name ,description,
weight FROM debezium_source";
System.out.println("execute dml ==>" + dml);
tEnv.executeSql(dml);
tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')" +
"LIKE debezium_source (EXCLUDING ALL)");
tEnv.executeSql("INSERT INTO print_table SELECT id,name
,description, weight FROM debezium_source");
--
Sent from: http://apache-flink.147419.n8.nabble.com/