[ https://issues.apache.org/jira/browse/FLINK-32822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhanghao Chen updated FLINK-32822: ---------------------------------- Description: When checkpointing is enabled, Flink Kafka connector commits the current consuming offset when checkpoints are *completed* although Kafka source does *NOT* rely on committed offsets for fault tolerance. When the checkpoint interval is long, the lag curve will behave in a zig-zag way: the lag will keep increasing, and suddenly drops on a complete checkpoint. It have led to some confusion for users as in [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease] and may also affect external monitoring for setting up alarms (you'll have to set up with a high threshold due to the non-realtime commit of offsets) and autoscaling (the algorithm would need to pay extra effort to distinguish whether the backlog is actually growing or just because the checkpoint is not completed yet). Therefore, I think it is worthwhile to add an option to enable auto-commit of offsets when checkpoints is enabled. For DataStream API, it will be adding a configuration method. For Table API, it will be adding a new connector option which wires to the DataStream API configuration underneath. was: When checkpointing is enabled, Flink Kafka connector commits the current consuming offset when checkpoints are *completed* although ** Kafka source does *NOT* rely on committed offsets for fault tolerance. When the checkpoint interval is long, the lag curve will behave in a zig-zag way: the lag will keep increasing, and suddenly drops on a complete checkpoint. It have led to some confusion for users as in [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease] and may also affect external monitoring for setting up alarms (you'll have to set up with a high threshold due to the non-realtime commit of offsets) and autoscaling (the algorithm would need to pay extra effort to distinguish whether the backlog is actually growing or just because the checkpoint is not completed yet). Therefore, I think it is worthwhile to add an option to enable auto-commit of offsets when checkpoints is enabled. For DataStream API, it will be adding a configuration method. For Table API, it will be adding a new connector option which wires to the DataStream API configuration underneath. > Add connector option to control whether to enable auto-commit of offsets when > checkpoints is enabled > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-32822 > URL: https://issues.apache.org/jira/browse/FLINK-32822 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Zhanghao Chen > Priority: Major > > When checkpointing is enabled, Flink Kafka connector commits the current > consuming offset when checkpoints are *completed* although Kafka source does > *NOT* rely on committed offsets for fault tolerance. When the checkpoint > interval is long, the lag curve will behave in a zig-zag way: the lag will > keep increasing, and suddenly drops on a complete checkpoint. It have led to > some confusion for users as in > [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease] > and may also affect external monitoring for setting up alarms (you'll have > to set up with a high threshold due to the non-realtime commit of offsets) > and autoscaling (the algorithm would need to pay extra effort to distinguish > whether the backlog is actually growing or just because the checkpoint is not > completed yet). > Therefore, I think it is worthwhile to add an option to enable auto-commit of > offsets when checkpoints is enabled. For DataStream API, it will be adding a > configuration method. For Table API, it will be adding a new connector option > which wires to the DataStream API configuration underneath. > -- This message was sent by Atlassian Jira (v8.20.10#820010)