我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下:
        String sourceDdl =" CREATE TABLE debezium_source " +
                "( " +
                "id STRING NOT NULL, name STRING, description STRING, weight
Double" +
                ") " +
                "WITH (" +
                " 'connector' = 'kafka-0.11'," +
                " 'topic' = 'test0717'," +
                " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', "
+
                "'scan.startup.mode' =
'group-offsets','properties.group.id'='test'," +
                "'format' = 'debezium-json'," +
                "'debezium-json.schema-include'='false'," +
                "'debezium-json.ignore-parse-errors'='true')";
        tEnv.executeSql(sourceDdl);
        System.out.println("init source ddl successful ==>" + sourceDdl);
        String sinkDdl = " CREATE TABLE sink " +
                "( " +
                "id STRING NOT NULL," +
                " name STRING, " +
                "description STRING," +
                " weight Double," +
                " PRIMARY KEY (id) NOT ENFORCED " +
                ")" +
                " WITH " +
                "( " +
                "'connector' = 'jdbc', " +
                "'url' =
'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
                "'table-name' = 'table-out', " +
                "'driver'= 'com.mysql.cj.jdbc.Driver'," +
                "'sink.buffer-flush.interval'='1s'," +
                "'sink.buffer-flush.max-rows'='1000'," +
                "'username'='DataPip', " +
                "'password'='DataPip')";
        tEnv.executeSql(sinkDdl);
        System.out.println("init sink ddl successful ==>" + sinkDdl);

         String dml = "INSERT INTO sink SELECT  id,name ,description, 
weight FROM debezium_source";
        System.out.println("execute dml  ==>" + dml);
        tEnv.executeSql(dml);
        tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')" +
                "LIKE debezium_source (EXCLUDING ALL)");
        tEnv.executeSql("INSERT INTO print_table SELECT  id,name
,description,  weight FROM debezium_source");



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复