kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。?
Btw, 查看反压状态是一个比较好的排查方式。

On Thu, 23 Jul 2020 at 20:25, godfrey he <[email protected]> wrote:

> 你观察到有sink写不过来导致反压吗?
> 或者你调大flush interval试试,让每个buffer攒更多的数据
>
> 曹武 <[email protected]> 于2020年7月23日周四 下午4:48写道:
>
> > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> > 代码如下:
> >         String sourceDdl =" CREATE TABLE debezium_source " +
> >                 "( " +
> >                 "id STRING NOT NULL, name STRING, description STRING,
> > weight
> > Double" +
> >                 ") " +
> >                 "WITH (" +
> >                 " 'connector' = 'kafka-0.11'," +
> >                 " 'topic' = 'test0717'," +
> >                 " 'properties.bootstrap.servers' = ' 172.22.20.206:9092
> ',
> > "
> > +
> >                 "'scan.startup.mode' =
> > 'group-offsets','properties.group.id'='test'," +
> >                 "'format' = 'debezium-json'," +
> >                 "'debezium-json.schema-include'='false'," +
> >                 "'debezium-json.ignore-parse-errors'='true')";
> >         tEnv.executeSql(sourceDdl);
> >         System.out.println("init source ddl successful ==>" + sourceDdl);
> >         String sinkDdl = " CREATE TABLE sink " +
> >                 "( " +
> >                 "id STRING NOT NULL," +
> >                 " name STRING, " +
> >                 "description STRING," +
> >                 " weight Double," +
> >                 " PRIMARY KEY (id) NOT ENFORCED " +
> >                 ")" +
> >                 " WITH " +
> >                 "( " +
> >                 "'connector' = 'jdbc', " +
> >                 "'url' =
> > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> >                 "'table-name' = 'table-out', " +
> >                 "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> >                 "'sink.buffer-flush.interval'='1s'," +
> >                 "'sink.buffer-flush.max-rows'='1000'," +
> >                 "'username'='DataPip', " +
> >                 "'password'='DataPip')";
> >         tEnv.executeSql(sinkDdl);
> >         System.out.println("init sink ddl successful ==>" + sinkDdl);
> >
> >          String dml = "INSERT INTO sink SELECT  id,name ,description,
> > weight FROM debezium_source";
> >         System.out.println("execute dml  ==>" + dml);
> >         tEnv.executeSql(dml);
> >         tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> > 'print')" +
> >                 "LIKE debezium_source (EXCLUDING ALL)");
> >         tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> > ,description,  weight FROM debezium_source");
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

回复