用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"     'connector' = 'mysql-cdc', " +
"     'server-id' = '5401-5404', " +
"     'scan.startup.mode' = 'latest-offset', " +
//                "     'scan.startup.mode' = 'earliest-offset', " +
"     'hostname' = '192.168.0.220', " +
"     'port' = '3306', " +
"     'username' = 'root', " +
"     'password' = 'root', " +
"     'database-name' = 'zy', " +
"     'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();


// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"  'connector' = 'jdbc', " +
"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
"  'table-name' = 't_stu2', " +
"  'username' = 'root', " +
"  'password' = 'root'  " +
")"
);

tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();

    }

回复