Hi, 曹武 这是一个已知bug,这个在1.11.1和1.12.0里已经修复,
如果着急使用,可以自己编译下release-1.11分支。 祝好 Leonard Xu https://issues.apache.org/jira/browse/FLINK-18461 <https://issues.apache.org/jira/browse/FLINK-18461> > 在 2020年7月17日,17:12,曹武 <[email protected]> 写道: > > 感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀! > > godfrey he wrote >> 为什么要 GROUP BY id,name ,description, weight ? >> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM >> debezium_source" 不能满足需求? >> >> 曹武 < > >> 14701319164@ > >>> 于2020年7月16日周四 下午9:30写道: >> >>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 >>> 从checkpoint恢复以后,新来op=d的数据会删除失败 >>> 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s >>> >>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata >>> 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); >>> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 >>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // >>> 最大允许同时出现几个CheckPoint >>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // >>> 最小得间隔时间 >>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >>> // >>> 是否倾向于用CheckPoint做故障恢复 >>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); >>> // >>> 容忍多少次CheckPoint失败 >>> //Checkpoint文件清理策略 >>> >>> >>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> //Checkpoint外部文件路径 >>> env.setStateBackend(new FsStateBackend(new >>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); >>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); >>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>> settings); >>> String sourceDDL = String.format( >>> "CREATE TABLE debezium_source (" + >>> " id INT NOT NULL," + >>> " name STRING," + >>> " description STRING," + >>> " weight Double" + >>> ") WITH (" + >>> " 'connector' = 'kafka-0.11'," + >>> " 'topic' = '%s'," + >>> " 'properties.bootstrap.servers' = '%s'," + >>> " 'scan.startup.mode' = 'group-offsets'," + >>> " 'format' = 'debezium-json'" + >>> ")", "ddd", " 172.22.20.206:9092"); >>> String sinkDDL = "CREATE TABLE sink (" + >>> " id INT NOT NULL," + >>> " name STRING," + >>> " description STRING," + >>> " weight Double," + >>> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED >>> " >>> + >>> ") WITH (" + >>> " 'connector' = 'jdbc'," + >>> " 'url' = >>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + >>> " 'table-name' = 'products'," + >>> " 'driver'= 'com.mysql.cj.jdbc.Driver'," + >>> " 'username'='DataPip'," + >>> " 'password'='DataPip'" + >>> ")"; >>> String dml = "INSERT INTO sink SELECT id,name ,description, >>> weight >>> FROM debezium_source GROUP BY id,name ,description, weight"; >>> tEnv.executeSql(sourceDDL); >>> tEnv.executeSql(sinkDDL); >>> tEnv.executeSql(dml); >>> >>> >>> >>> -- >>> Sent from: http://apache-flink.147419.n8.nabble.com/ >>> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > <http://apache-flink.147419.n8.nabble.com/>
