我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
        //Checkpoint文件清理策略
       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //Checkpoint外部文件路径
        env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
        String sourceDDL = String.format(
                "CREATE TABLE debezium_source (" +
                        " id INT NOT NULL," +
                        " name STRING," +
                        " description STRING," +
                        " weight Double" +
                        ") WITH (" +
                        " 'connector' = 'kafka-0.11'," +
                        " 'topic' = '%s'," +
                        " 'properties.bootstrap.servers' = '%s'," +
                        " 'scan.startup.mode' = 'group-offsets'," +
                        " 'format' = 'debezium-json'" +
                        ")", "ddd", " 172.22.20.206:9092");
        String sinkDDL = "CREATE TABLE sink (" +
                " id INT NOT NULL," +
                " name STRING," +
                " description STRING," +
                " weight Double," +
                " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
                " 'table-name' = 'products'," +
                " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
                " 'username'='DataPip'," +
                " 'password'='DataPip'" +
                ")";
        String dml = "INSERT INTO sink SELECT  id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);
        tEnv.executeSql(dml);



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复