cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:33写道:

> 我是通过savepoint的方式重启的,命令如下:
>
> cancel command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> -yid application_1625497885855_698371 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> \
> 59cf6ccc83aa163bd1e0cd3304dfe06a
>
> print savepoint:
>
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>
>
> restart command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> -m yarn-cluster \
> -yjm 4096 -ytm 4096 \
> -ynm User_Click_Log_Split_All \
> -yqu syh_offline \
> -ys 2 \
> -d \
> -p 64 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> \
> -n \
> -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>
> Jim Chen <chenshuai19950...@gmail.com> 于2021年8月2日周一 下午2:01写道:
>
> > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >
> > My Versions
> > Flink 1.12.4
> > Kafka 2.0.1
> > Java 1.8
> >
> > Core code:
> >
> > env.enableCheckpointing(300000);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >
> > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >
> > tableEnv.createTemporaryView("data_table",dataDS);
> > String sql = "select * from data_table a inner join
> > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
> a.id
> > = b.id"
> > Table table = tableEnv.sqlQuery(sql);
> > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
> >
> > // Kafka producer parameter
> > Properties producerProps = new Properties();
> > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > bootstrapServers);
> > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> kafkaBufferMemory);
> > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000);
> > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> > "1");
> > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> >
> > resultDS.addSink(new FlinkKafkaProducer<JSONObject>(sinkTopic, new
> > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> >                 .setParallelism(sinkParallelism);
> >
>

回复