你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。

Best,
Shengkai

左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:

>
>
>
>
>
>
>
>
>
> .print(); 去掉也不行,
>  
> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>
>
>
>
>
>
>
>
> 在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道:
>
> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> >    StatementSet statementSet = tenv.createStatementSet();
> >    statementSet.addInsertSql(sql1);
> >    statementSet.addInsertSql(sql2);
> >    TableResult result = statementSet.execute();
> >    result.getJobClient().get().getJobID().toString();
> >
> >
> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
>  --------------------这个任务给去掉
> >
> >
> >
> >yinghua...@163.com
> >
> >发件人: 左岩
> >发送时间: 2022-11-04 14:34
> >收件人: user-zh
> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
> >代码如下:控制台打印情况见附件
> >public static void main(String[] args) throws Exception {
> >Configuration conf = new Configuration();
> >conf.setInteger("rest.port", 10041);
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >
> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> >
> >env.setParallelism(1);
> >// 建表
> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
> >"      userid INT, " +
> >"      username string, " +
> >"      age string, " +
> >"      `partition` INT, " +
> >"     PRIMARY KEY(userid) NOT ENFORCED " +
> >"     ) WITH ( " +
> >"     'connector' = 'mysql-cdc', " +
> >"     'server-id' = '5401-5404', " +
> >"     'scan.startup.mode' = 'latest-offset', " +
> >//                "     'scan.startup.mode' = 'earliest-offset', " +
> >"     'hostname' = '192.168.0.220', " +
> >"     'port' = '3306', " +
> >"     'username' = 'root', " +
> >"     'password' = 'root', " +
> >"     'database-name' = 'zy', " +
> >"     'table-name' = 't_stu' " +
> >")");
> >
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
> >
> >
> >// 建一个目标表,用来存放查询结果
> >tenv.executeSql(
> >"CREATE TABLE flink_t_stu2 ( " +
> >"      userid INT, " +
> >"      username string, " +
> >"      age string, " +
> >"      `partition` INT, " +
> >"     PRIMARY KEY(userid) NOT ENFORCED " +
> >"     ) WITH ( " +
> >"  'connector' = 'jdbc', " +
> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
> >"  'table-name' = 't_stu2', " +
> >"  'username' = 'root', " +
> >"  'password' = 'root'  " +
> >")"
> >);
> >
> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
> >"SELECT * FROM flink_t_stu");
> >env.execute();
> >
> >    }
>

回复