chia7712 commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1769232359


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1291,6 +1294,10 @@ private void releaseAssignmentAndLeaveGroup(final Timer 
timer) {
             log.warn("Consumer triggered an unsubscribe event to leave the 
group but couldn't " +
                 "complete it within {} ms. It will proceed to close.", 
timer.timeoutMs());
         } finally {
+            // Regardless of success or failure of the unsubscribe process, 
it's important to process any background
+            // events in the hope that our 
ConsumerRebalanceListenerCallbackNeededEvent is present and can be executed.
+            processBackgroundEvents();

Review Comment:
   IIRC, #16974 ensures `pollOnClose` will return `LEAVE_GROUP` request even 
though the callback is NOT executed. so why we need to process all background 
events?  Or the purpose is to make sure callback is executed even though the 
timeout is small?
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1227,13 +1227,14 @@ public void close(Duration timeout) {
 
     private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
+        boolean wasInterrupted = Thread.interrupted();

Review Comment:
   the classic consumer does not reset the interrupted status, so this may 
introduce some behavior change to consumer listener? for example, consumer 
listener CAN'T see the interrupt anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to