我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码: EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
//Checkpoint文件清理策略
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Checkpoint外部文件路径
env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double" +
") WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'scan.startup.mode' = 'group-offsets'," +
" 'format' = 'debezium-json'" +
")", "ddd", " 172.22.20.206:9092");
String sinkDDL = "CREATE TABLE sink (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double," +
" PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
" 'table-name' = 'products'," +
" 'driver'= 'com.mysql.cj.jdbc.Driver'," +
" 'username'='DataPip'," +
" 'password'='DataPip'" +
")";
String dml = "INSERT INTO sink SELECT id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(dml);
--
Sent from: http://apache-flink.147419.n8.nabble.com/