Hi, 曹武

这是一个已知bug,这个在1.11.1和1.12.0里已经修复,

如果着急使用,可以自己编译下release-1.11分支。

祝好 
Leonard Xu

https://issues.apache.org/jira/browse/FLINK-18461 
<https://issues.apache.org/jira/browse/FLINK-18461>

> 在 2020年7月17日,17:12,曹武 <[email protected]> 写道:
> 
> 感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!
> 
> godfrey he wrote
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>> 
>> 曹武 <
> 
>> 14701319164@
> 
>>> 于2020年7月16日周四 下午9:30写道:
>> 
>>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>> 
>>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>                .useBlinkPlanner()
>>>                .inStreamingMode()
>>>                .build();
>>> 
>>>        StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>>>        env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>>>        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>>> 最大允许同时出现几个CheckPoint
>>>        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>>> 最小得间隔时间
>>>        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>> //
>>> 是否倾向于用CheckPoint做故障恢复
>>>        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>>> //
>>> 容忍多少次CheckPoint失败
>>>        //Checkpoint文件清理策略
>>> 
>>> 
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>        //Checkpoint外部文件路径
>>>        env.setStateBackend(new FsStateBackend(new
>>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>>>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>> settings);
>>>        String sourceDDL = String.format(
>>>                "CREATE TABLE debezium_source (" +
>>>                        " id INT NOT NULL," +
>>>                        " name STRING," +
>>>                        " description STRING," +
>>>                        " weight Double" +
>>>                        ") WITH (" +
>>>                        " 'connector' = 'kafka-0.11'," +
>>>                        " 'topic' = '%s'," +
>>>                        " 'properties.bootstrap.servers' = '%s'," +
>>>                        " 'scan.startup.mode' = 'group-offsets'," +
>>>                        " 'format' = 'debezium-json'" +
>>>                        ")", "ddd", " 172.22.20.206:9092");
>>>        String sinkDDL = "CREATE TABLE sink (" +
>>>                " id INT NOT NULL," +
>>>                " name STRING," +
>>>                " description STRING," +
>>>                " weight Double," +
>>>                " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
>>> "
>>> +
>>>                ") WITH (" +
>>>                " 'connector' = 'jdbc'," +
>>>                " 'url' =
>>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>>>                " 'table-name' = 'products'," +
>>>                " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>>>                " 'username'='DataPip'," +
>>>                " 'password'='DataPip'" +
>>>                ")";
>>>        String dml = "INSERT INTO sink SELECT  id,name ,description,
>>> weight
>>> FROM debezium_source GROUP BY id,name ,description, weight";
>>>        tEnv.executeSql(sourceDDL);
>>>        tEnv.executeSql(sinkDDL);
>>>        tEnv.executeSql(dml);
>>> 
>>> 
>>> 
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/ 
> <http://apache-flink.147419.n8.nabble.com/>

回复