CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里
在 2022-11-07 10:11:56,"Shengkai Fang" <fskm...@gmail.com> 写道: >你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。 > >Best, >Shengkai > >左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道: > >> >> >> >> >> >> >> >> >> >> .print(); 去掉也不行, >> >> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 >> >> >> >> >> >> >> >> >> 在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道: >> >> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 >> > StatementSet statementSet = tenv.createStatementSet(); >> > statementSet.addInsertSql(sql1); >> > statementSet.addInsertSql(sql2); >> > TableResult result = statementSet.execute(); >> > result.getJobClient().get().getJobID().toString(); >> > >> > >> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> --------------------这个任务给去掉 >> > >> > >> > >> >yinghua...@163.com >> > >> >发件人: 左岩 >> >发送时间: 2022-11-04 14:34 >> >收件人: user-zh >> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 >> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 >> >代码如下:控制台打印情况见附件 >> >public static void main(String[] args) throws Exception { >> >Configuration conf = new Configuration(); >> >conf.setInteger("rest.port", 10041); >> >StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(conf); >> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> > >> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); >> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); >> > >> >env.setParallelism(1); >> >// 建表 >> >tenv.executeSql("CREATE TABLE flink_t_stu ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'mysql-cdc', " + >> >" 'server-id' = '5401-5404', " + >> >" 'scan.startup.mode' = 'latest-offset', " + >> >// " 'scan.startup.mode' = 'earliest-offset', " + >> >" 'hostname' = '192.168.0.220', " + >> >" 'port' = '3306', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root', " + >> >" 'database-name' = 'zy', " + >> >" 'table-name' = 't_stu' " + >> >")"); >> > >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> > >> > >> >// 建一个目标表,用来存放查询结果 >> >tenv.executeSql( >> >"CREATE TABLE flink_t_stu2 ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'jdbc', " + >> >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + >> >" 'table-name' = 't_stu2', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root' " + >> >")" >> >); >> > >> >tenv.executeSql("INSERT INTO flink_t_stu2 " + >> >"SELECT * FROM flink_t_stu"); >> >env.execute(); >> > >> > } >>