[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525842#comment-17525842 ]
Kyle R Stehbens commented on KAFKA-13840: ----------------------------------------- Thanks [~showuon] ! In the particular case I was investigating we are using the KafkaSource (with check pointing enabled) in flink as our wrapper to the KafkaClient. I'm fairly sure its doing consumer.assign() in order to start from check-pointed offsets rather than from the groups committed ones - though it is also working in group mode (as the missing commits and progress on each partition in the group is how we noticed this issue). I was also confused as to why the heartbeat wasn't working to clear out the failed future ref as well and I think I may be due to a subtle change in the heartbeart logic here: [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1273] vs here: [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1367] Possibly before hand the logic was only checking if state != MemberState.STABLE allowing the code to fall through to this check: [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1284] which would have cleared the future on failure as the logic exists handler to do so. The new logic will likely hit a continue; call further up never allowing the logic to reach here: [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1379] I'll see if I can surface you some logs of this happening, but in the mean time the general pattern was: A Timeout occurred from our broker calling commitAsync(); Marking the oordinator as unavailable(). (Timeout retry-able exception) Next commitAsync(), and further commit asyncs all result in "coordinator unavailable" exceptions logged. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 > Reporter: Kyle R Stehbens > Assignee: Luke Chen > Priority: Major > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.7#820007)