Hi,

Maybe you can write like this :
builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
"true");

Other additional properties could be found here :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties

Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
>
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
>
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
>
> -Marco
>
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> Flink 1.14 release note states about this. See [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>
>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>>
>>> Hi everybody,
>>>
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>
>>> FlinkKafkaConsumer has the method
>>>
>>> /**
>>>>  * Specifies whether or not the consumer should commit offsets back to
>>>> Kafka on checkpoints.
>>>>  * This setting will only have effect if checkpointing is enabled for
>>>> the job. If checkpointing isn't
>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>> "enable.auto.commit" (for 0.9+) property
>>>>  * settings will be used.
>>>> */
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>
>>>
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>>
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>
>>>  For example, the Flink 1.14 documentation states:
>>>
>>> Additional Properties
>>>> In addition to properties described above, you can set arbitrary
>>>> properties for KafkaSource and KafkaConsumer by using
>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>> following options for configuration:
>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>> offsets to Kafka brokers on checkpoint
>>>
>>>
>>> And the 1.12 documentation states:
>>>
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>> offsets, together with the state of other operations. In case of a job
>>>> failure, Flink will restore the streaming program to the state of the
>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>> offsets that were stored in the checkpoint.
>>>> The interval of drawing checkpoints therefore defines how much the
>>>> program may have to go back at most, in case of a failure. To use fault
>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>>> in the job.
>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>> commit the offsets to Zookeeper.
>>>
>>>
>>> Thank you.
>>>
>>> Marco
>>>
>>>
>>>

Reply via email to