用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 代码如下:控制台打印情况见附件 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 10041); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); env.setParallelism(1); // 建表 tenv.executeSql("CREATE TABLE flink_t_stu ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'mysql-cdc', " + " 'server-id' = '5401-5404', " + " 'scan.startup.mode' = 'latest-offset', " + // " 'scan.startup.mode' = 'earliest-offset', " + " 'hostname' = '192.168.0.220', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'root', " + " 'database-name' = 'zy', " + " 'table-name' = 't_stu' " + ")"); // 查询 tenv.executeSql("select * from flink_t_stu").print(); // 建一个目标表,用来存放查询结果 tenv.executeSql( "CREATE TABLE flink_t_stu2 ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + " 'table-name' = 't_stu2', " + " 'username' = 'root', " + " 'password' = 'root' " + ")" ); tenv.executeSql("INSERT INTO flink_t_stu2 " + "SELECT * FROM flink_t_stu"); env.execute(); }