Hi, 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?
现象如下: 1. 任务没有异常, 2. 数据能正常消费处理,不影响数据使用 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset 4. 部分任务的从Kafka的offset提交失败,部分正常 WARN日志如下: 2022-06-27 01:07:42,725 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11398 (max part counter=1). 2022-06-27 01:07:42,830 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11398. 2022-06-27 01:07:43,820 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11476 (max part counter=0). 2022-06-27 01:07:43,946 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11476. 2022-06-27 01:07:45,218 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11521 (max part counter=47). 2022-06-27 01:07:45,290 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11521. 2022-06-27 01:07:45,521 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 11443 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-27 01:07:45,990 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 11398 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. Thanks~