
Maybe you can write like this :

Other additional properties could be found here :

Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
> -Marco
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>> Hi!
>> Flink 1.14 release note states about this. See [1].
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>>> Hi everybody,
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>> FlinkKafkaConsumer has the method
>>> /**
>>>>  * Specifies whether or not the consumer should commit offsets back to
>>>> Kafka on checkpoints.
>>>>  * This setting will only have effect if checkpointing is enabled for
>>>> the job. If checkpointing isn't
>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>> "enable.auto.commit" (for 0.9+) property
>>>>  * settings will be used.
>>>> */
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>  For example, the Flink 1.14 documentation states:
>>> Additional Properties
>>>> In addition to properties described above, you can set arbitrary
>>>> properties for KafkaSource and KafkaConsumer by using
>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>> following options for configuration:
>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>> offsets to Kafka brokers on checkpoint
>>> And the 1.12 documentation states:
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>> offsets, together with the state of other operations. In case of a job
>>>> failure, Flink will restore the streaming program to the state of the
>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>> offsets that were stored in the checkpoint.
>>>> The interval of drawing checkpoints therefore defines how much the
>>>> program may have to go back at most, in case of a failure. To use fault
>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>>> in the job.
>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>> commit the offsets to Zookeeper.
>>> Thank you.
>>> Marco

Reply via email to