你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
Best, Shengkai 左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道: > > > > > > > > > > .print(); 去掉也不行, > > 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 > > > > > > > > > 在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道: > > >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 > > StatementSet statementSet = tenv.createStatementSet(); > > statementSet.addInsertSql(sql1); > > statementSet.addInsertSql(sql2); > > TableResult result = statementSet.execute(); > > result.getJobClient().get().getJobID().toString(); > > > > > >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 > >// 查询 > >tenv.executeSql("select * from flink_t_stu").print(); > --------------------这个任务给去掉 > > > > > > > >yinghua...@163.com > > > >发件人: 左岩 > >发送时间: 2022-11-04 14:34 > >收件人: user-zh > >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 > >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 > >代码如下:控制台打印情况见附件 > >public static void main(String[] args) throws Exception { > >Configuration conf = new Configuration(); > >conf.setInteger("rest.port", 10041); > >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > > >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); > >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > > > >env.setParallelism(1); > >// 建表 > >tenv.executeSql("CREATE TABLE flink_t_stu ( " + > >" userid INT, " + > >" username string, " + > >" age string, " + > >" `partition` INT, " + > >" PRIMARY KEY(userid) NOT ENFORCED " + > >" ) WITH ( " + > >" 'connector' = 'mysql-cdc', " + > >" 'server-id' = '5401-5404', " + > >" 'scan.startup.mode' = 'latest-offset', " + > >// " 'scan.startup.mode' = 'earliest-offset', " + > >" 'hostname' = '192.168.0.220', " + > >" 'port' = '3306', " + > >" 'username' = 'root', " + > >" 'password' = 'root', " + > >" 'database-name' = 'zy', " + > >" 'table-name' = 't_stu' " + > >")"); > > > >// 查询 > >tenv.executeSql("select * from flink_t_stu").print(); > > > > > >// 建一个目标表,用来存放查询结果 > >tenv.executeSql( > >"CREATE TABLE flink_t_stu2 ( " + > >" userid INT, " + > >" username string, " + > >" age string, " + > >" `partition` INT, " + > >" PRIMARY KEY(userid) NOT ENFORCED " + > >" ) WITH ( " + > >" 'connector' = 'jdbc', " + > >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + > >" 'table-name' = 't_stu2', " + > >" 'username' = 'root', " + > >" 'password' = 'root' " + > >")" > >); > > > >tenv.executeSql("INSERT INTO flink_t_stu2 " + > >"SELECT * FROM flink_t_stu"); > >env.execute(); > > > > } >