kirktrue commented on PR #16686: URL: https://github.com/apache/kafka/pull/16686#issuecomment-2311438535
@lianetm, I looked back at one of your previous comments and noticed your observation about the call to `Thread.interrupted()` in `ConsumerNetworkClient.poll()`. So yes, the classic consumer does check (and clear) the flag, and that error is thrown all the way up to `ClassicKafkaConsumer.close()`. However, `ClassicKafkaConsumer` suppresses the error (via its `swallow()` method). But I'm not seeing that the `Timer` object is updated, cleared, or reset, though. In addition, right after the coordinator is closed, the [classic consumer attempts to close the fetcher](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1143-L1154). The fetcher closing code specifically deals with the timeout, but neither the comments nor the code itself refers to changing the timeout due to a previous interrupt. The classic consumer also appears to invoke the appropriate `ConsumerRebalanceListener` callback in `ConsumerCoordinator.onLeavePrepare()` before attempting to leave the group, specifically when the coordinator is closing. In an attempt to emulate that behavior, I added the additional call to `processBackgroundEvents()` in `AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup()` to try to invoke the `ConsumerRebalanceListener` callback after the `UnsubscribeEvent` completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org