kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。? Btw, 查看反压状态是一个比较好的排查方式。
On Thu, 23 Jul 2020 at 20:25, godfrey he <[email protected]> wrote: > 你观察到有sink写不过来导致反压吗? > 或者你调大flush interval试试,让每个buffer攒更多的数据 > > 曹武 <[email protected]> 于2020年7月23日周四 下午4:48写道: > > > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 > > 代码如下: > > String sourceDdl =" CREATE TABLE debezium_source " + > > "( " + > > "id STRING NOT NULL, name STRING, description STRING, > > weight > > Double" + > > ") " + > > "WITH (" + > > " 'connector' = 'kafka-0.11'," + > > " 'topic' = 'test0717'," + > > " 'properties.bootstrap.servers' = ' 172.22.20.206:9092 > ', > > " > > + > > "'scan.startup.mode' = > > 'group-offsets','properties.group.id'='test'," + > > "'format' = 'debezium-json'," + > > "'debezium-json.schema-include'='false'," + > > "'debezium-json.ignore-parse-errors'='true')"; > > tEnv.executeSql(sourceDdl); > > System.out.println("init source ddl successful ==>" + sourceDdl); > > String sinkDdl = " CREATE TABLE sink " + > > "( " + > > "id STRING NOT NULL," + > > " name STRING, " + > > "description STRING," + > > " weight Double," + > > " PRIMARY KEY (id) NOT ENFORCED " + > > ")" + > > " WITH " + > > "( " + > > "'connector' = 'jdbc', " + > > "'url' = > > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + > > "'table-name' = 'table-out', " + > > "'driver'= 'com.mysql.cj.jdbc.Driver'," + > > "'sink.buffer-flush.interval'='1s'," + > > "'sink.buffer-flush.max-rows'='1000'," + > > "'username'='DataPip', " + > > "'password'='DataPip')"; > > tEnv.executeSql(sinkDdl); > > System.out.println("init sink ddl successful ==>" + sinkDdl); > > > > String dml = "INSERT INTO sink SELECT id,name ,description, > > weight FROM debezium_source"; > > System.out.println("execute dml ==>" + dml); > > tEnv.executeSql(dml); > > tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = > > 'print')" + > > "LIKE debezium_source (EXCLUDING ALL)"); > > tEnv.executeSql("INSERT INTO print_table SELECT id,name > > ,description, weight FROM debezium_source"); > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > >
