你观察到有sink写不过来导致反压吗? 或者你调大flush interval试试,让每个buffer攒更多的数据
曹武 <[email protected]> 于2020年7月23日周四 下午4:48写道: > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 > 代码如下: > String sourceDdl =" CREATE TABLE debezium_source " + > "( " + > "id STRING NOT NULL, name STRING, description STRING, > weight > Double" + > ") " + > "WITH (" + > " 'connector' = 'kafka-0.11'," + > " 'topic' = 'test0717'," + > " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', > " > + > "'scan.startup.mode' = > 'group-offsets','properties.group.id'='test'," + > "'format' = 'debezium-json'," + > "'debezium-json.schema-include'='false'," + > "'debezium-json.ignore-parse-errors'='true')"; > tEnv.executeSql(sourceDdl); > System.out.println("init source ddl successful ==>" + sourceDdl); > String sinkDdl = " CREATE TABLE sink " + > "( " + > "id STRING NOT NULL," + > " name STRING, " + > "description STRING," + > " weight Double," + > " PRIMARY KEY (id) NOT ENFORCED " + > ")" + > " WITH " + > "( " + > "'connector' = 'jdbc', " + > "'url' = > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + > "'table-name' = 'table-out', " + > "'driver'= 'com.mysql.cj.jdbc.Driver'," + > "'sink.buffer-flush.interval'='1s'," + > "'sink.buffer-flush.max-rows'='1000'," + > "'username'='DataPip', " + > "'password'='DataPip')"; > tEnv.executeSql(sinkDdl); > System.out.println("init sink ddl successful ==>" + sinkDdl); > > String dml = "INSERT INTO sink SELECT id,name ,description, > weight FROM debezium_source"; > System.out.println("execute dml ==>" + dml); > tEnv.executeSql(dml); > tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = > 'print')" + > "LIKE debezium_source (EXCLUDING ALL)"); > tEnv.executeSql("INSERT INTO print_table SELECT id,name > ,description, weight FROM debezium_source"); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
