cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:33写道: > 我是通过savepoint的方式重启的,命令如下: > > cancel command: > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > -yid application_1625497885855_698371 \ > -s > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > \ > 59cf6ccc83aa163bd1e0cd3304dfe06a > > print savepoint: > > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > > > restart command: > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > -m yarn-cluster \ > -yjm 4096 -ytm 4096 \ > -ynm User_Click_Log_Split_All \ > -yqu syh_offline \ > -ys 2 \ > -d \ > -p 64 \ > -s > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > \ > -n \ > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:01写道: > > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > > > > My Versions > > Flink 1.12.4 > > Kafka 2.0.1 > > Java 1.8 > > > > Core code: > > > > env.enableCheckpointing(300000); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > > > > tableEnv.createTemporaryView("data_table",dataDS); > > String sql = "select * from data_table a inner join > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on > a.id > > = b.id" > > Table table = tableEnv.sqlQuery(sql); > > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); > > > > // Kafka producer parameter > > Properties producerProps = new Properties(); > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > bootstrapServers); > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > kafkaBufferMemory); > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > > "1"); > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > > > > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > > .setParallelism(sinkParallelism); > > >