[ 
https://issues.apache.org/jira/browse/FLINK-32822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-32822:
----------------------------------
    Description: 
When checkpointing is enabled, Flink Kafka connector commits the current 
consuming offset when checkpoints are *completed* although Kafka source does 
*NOT* rely on committed offsets for fault tolerance. When the checkpoint 
interval is long, the lag curve will behave in a zig-zag way: the lag will keep 
increasing, and suddenly drops on a complete checkpoint. It have led to some 
confusion for users as in 
[https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease]
 and may also affect external monitoring for setting up alarms (you'll have to 
set up with a high threshold due to the non-realtime commit of offsets) and 
autoscaling (the algorithm would need to pay extra effort to distinguish 
whether the backlog is actually growing or just because the checkpoint is not 
completed yet).

Therefore, I think it is worthwhile to add an option to enable auto-commit of 
offsets when checkpoints is enabled. For DataStream API, it will be adding a 
configuration method. For Table API, it will be adding a new connector option 
which wires to the DataStream API configuration underneath.

 

  was:
When checkpointing is enabled, Flink Kafka connector commits the current 
consuming offset when checkpoints are *completed* although ** Kafka source does 
*NOT* rely on committed offsets for fault tolerance. When the checkpoint 
interval is long, the lag curve will behave in a zig-zag way: the lag will keep 
increasing, and suddenly drops on a complete checkpoint. It have led to some 
confusion for users as in 
[https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease]
 and may also affect external monitoring for setting up alarms (you'll have to 
set up with a high threshold due to the non-realtime commit of offsets) and 
autoscaling (the algorithm would need to pay extra effort to distinguish 
whether the backlog is actually growing or just because the checkpoint is not 
completed yet).

Therefore, I think it is worthwhile to add an option to enable auto-commit of 
offsets when checkpoints is enabled. For DataStream API, it will be adding a 
configuration method. For Table API, it will be adding a new connector option 
which wires to the DataStream API configuration underneath.

 


> Add connector option to control whether to enable auto-commit of offsets when 
> checkpoints is enabled
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32822
>                 URL: https://issues.apache.org/jira/browse/FLINK-32822
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Zhanghao Chen
>            Priority: Major
>
> When checkpointing is enabled, Flink Kafka connector commits the current 
> consuming offset when checkpoints are *completed* although Kafka source does 
> *NOT* rely on committed offsets for fault tolerance. When the checkpoint 
> interval is long, the lag curve will behave in a zig-zag way: the lag will 
> keep increasing, and suddenly drops on a complete checkpoint. It have led to 
> some confusion for users as in 
> [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease]
>  and may also affect external monitoring for setting up alarms (you'll have 
> to set up with a high threshold due to the non-realtime commit of offsets) 
> and autoscaling (the algorithm would need to pay extra effort to distinguish 
> whether the backlog is actually growing or just because the checkpoint is not 
> completed yet).
> Therefore, I think it is worthwhile to add an option to enable auto-commit of 
> offsets when checkpoints is enabled. For DataStream API, it will be adding a 
> configuration method. For Table API, it will be adding a new connector option 
> which wires to the DataStream API configuration underneath.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to