你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据

曹武 <[email protected]> 于2020年7月23日周四 下午4:48写道:

> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
>         String sourceDdl =" CREATE TABLE debezium_source " +
>                 "( " +
>                 "id STRING NOT NULL, name STRING, description STRING,
> weight
> Double" +
>                 ") " +
>                 "WITH (" +
>                 " 'connector' = 'kafka-0.11'," +
>                 " 'topic' = 'test0717'," +
>                 " 'properties.bootstrap.servers' = ' 172.22.20.206:9092',
> "
> +
>                 "'scan.startup.mode' =
> 'group-offsets','properties.group.id'='test'," +
>                 "'format' = 'debezium-json'," +
>                 "'debezium-json.schema-include'='false'," +
>                 "'debezium-json.ignore-parse-errors'='true')";
>         tEnv.executeSql(sourceDdl);
>         System.out.println("init source ddl successful ==>" + sourceDdl);
>         String sinkDdl = " CREATE TABLE sink " +
>                 "( " +
>                 "id STRING NOT NULL," +
>                 " name STRING, " +
>                 "description STRING," +
>                 " weight Double," +
>                 " PRIMARY KEY (id) NOT ENFORCED " +
>                 ")" +
>                 " WITH " +
>                 "( " +
>                 "'connector' = 'jdbc', " +
>                 "'url' =
> 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
>                 "'table-name' = 'table-out', " +
>                 "'driver'= 'com.mysql.cj.jdbc.Driver'," +
>                 "'sink.buffer-flush.interval'='1s'," +
>                 "'sink.buffer-flush.max-rows'='1000'," +
>                 "'username'='DataPip', " +
>                 "'password'='DataPip')";
>         tEnv.executeSql(sinkDdl);
>         System.out.println("init sink ddl successful ==>" + sinkDdl);
>
>          String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight FROM debezium_source";
>         System.out.println("execute dml  ==>" + dml);
>         tEnv.executeSql(dml);
>         tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> 'print')" +
>                 "LIKE debezium_source (EXCLUDING ALL)");
>         tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> ,description,  weight FROM debezium_source");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复