kirktrue commented on PR #16686:
URL: https://github.com/apache/kafka/pull/16686#issuecomment-2311438535

   @lianetm, I looked back at one of your previous comments and noticed your 
observation about the call to `Thread.interrupted()` in 
`ConsumerNetworkClient.poll()`. So yes, the classic consumer does check (and 
clear) the flag, and that error is thrown all the way up to 
`ClassicKafkaConsumer.close()`. However, `ClassicKafkaConsumer` suppresses the 
error (via its `swallow()` method). But I'm not seeing that the `Timer` object 
is updated, cleared, or reset, though.
   
   In addition, right after the coordinator is closed, the [classic consumer 
attempts to close the 
fetcher](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1143-L1154).
 The fetcher closing code specifically deals with the timeout, but neither the 
comments nor the code itself refers to changing the timeout due to a previous 
interrupt.
   
   The classic consumer also appears to invoke the appropriate 
`ConsumerRebalanceListener` callback in `ConsumerCoordinator.onLeavePrepare()` 
before attempting to leave the group, specifically when the coordinator is 
closing. In an attempt to emulate that behavior, I added the additional call to 
`processBackgroundEvents()` in 
`AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup()` to try to invoke the 
`ConsumerRebalanceListener` callback after the `UnsubscribeEvent` completes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to