[ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525919#comment-17525919
 ] 

Kyle R Stehbens edited comment on KAFKA-13840 at 4/21/22 6:07 PM:
------------------------------------------------------------------

These are some of the (redacted) logs from when the issue starts occurring.

Note we have flink check pointing setup for every 30s, hence the calls to 
commitAsync() are attempted every 30s.

First the timeout is what triggers the issue (Caused because our brokers are 
likely too busy and we sometimes get a timeout once every few hours).

>From then on forward, any further commit attempt results in 
>CoordinatorNotAvailableException.

 
2022-04-18 05:10:14,006 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer 
clientId=auxiliary-6, groupId=writer.main.auxiliary] Group coordinator 
kaf9.mycompany.com:9092 (id: 2147482632 rack: null) is unavailable or invalid 
due to cause: error response REQUEST_TIMED_OUT.isDisconnected: false. 
Rediscovery will be attempted.
 
2022-04-18 05:10:40,713 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13149 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
 
2022-04-18 05:11:10,890 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13150 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
 
2022-04-18 05:11:41,051 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13151 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.


was (Author: kyle.stehbens):
These are some of the (redacted) logs from when the issue starts occurring.

Note we have flink check pointing setup for every 30s, hence the calls to 
commitAsync() are attempted every 30s.

First the timeout is what triggers the issue (Caused because our brokers are 
likely too busy and we sometimes get a timeout once every few hours).

>From then on forward, any further commit attempt results in 
>CoordinatorNotAvailableException.

 
2022-04-18 05:10:14,006 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer 
clientId=auxiliary-6, groupId=writer.main.auxiliary] Group coordinator 
kaf9.mycompamy.com:9092 (id: 2147482632 rack: null) is unavailable or invalid 
due to cause: error response REQUEST_TIMED_OUT.isDisconnected: false. 
Rediscovery will be attempted.
 
2022-04-18 05:10:40,713 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13149 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
 
2022-04-18 05:11:10,890 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13150 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
 
2022-04-18 05:11:41,051 WARN 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 13151 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets. Caused by: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13840
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Kyle R Stehbens
>            Assignee: Luke Chen
>            Priority: Major
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to