kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1823647737


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1781,30 +1776,54 @@ private boolean processBackgroundEvents() {
     }
 
     /**
-     * This method can be used by cases where the caller has an event that 
needs to both block for completion but
-     * also process background events. For some events, in order to fully 
process the associated logic, the
-     * {@link ConsumerNetworkThread background thread} needs assistance from 
the application thread to complete.
-     * If the application thread simply blocked on the event after submitting 
it, the processing would deadlock.
-     * The logic herein is basically a loop that performs two tasks in each 
iteration:
-     *
-     * <ol>
-     *     <li>Process background events, if any</li>
-     *     <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an 
event} to complete</li>
-     * </ol>
+     * When unsubscribing, the application thread enqueues an {@link 
UnsubscribeEvent} on the application event queue.
+     * That event will eventually trigger the rebalancing logic in the 
background thread.
+     * Critically, as part of this rebalancing work, any
+     * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection) 
rebalance listener callback} may need to be
+     * invoked.
      *
      * <p/>
      *
-     * Each iteration gives the application thread an opportunity to process 
background events, which may be
-     * necessary to complete the overall processing.
+     * There are a number of challenges that arise during this seemingly 
simple process:
      *
-     * <p/>
+     * <ul>
+     *     <li>
+     *         While most of the unsubscribe logic is performed on the 
background thread, the
+     *         {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener 
callback} must be
+     *         executed on the application thread. This requires a delicate 
dance between the two threads that is
+     *         orchestrated via the {@link 
ConsumerRebalanceListenerCallbackNeededEvent} and
+     *         {@link ConsumerRebalanceListenerCallbackCompletedEvent} events.
+     *     </li>
+     *     <li>
+     *         The user may or may not be using a callback handler. This can 
be deduced by
+     *         {@link SubscriptionState#rebalanceListener() checking for 
rebalance listener}.
+     *     </li>
+     *     <li>
+     *         If, at the time of unsubscribe, the consumer does not have any 
partitions assigned, the background
+     *         thread will <em>not</em> enqueue a {@link 
ConsumerRebalanceListenerCallbackNeededEvent} to signal the
+     *         application thread to execute the
+     *         {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener 
callback}. It's
+     *         technically possible to
+     *         {@link SubscriptionState#assignedPartitions() check if there 
are any partitions assigned} in the
+     *         application thread, but it's possible the assignment could 
change in the background thread. So the
+     *         application thread cannot blindly assume that a {@link 
ConsumerRebalanceListenerCallbackNeededEvent}
+     *         will appear in the background event queue even if a rebalance 
listener is in use.
+     *     </li>
+     *     <li>
+     *         The call to {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)} may take so long that
+     *         it exhausts the user-supplied {@link Timer} for the unsubscribe 
operation. This would lead to the
+     *         application thread throwing a timeout before the {@link 
UnsubscribeEvent} is completed successfully.
+     *     </li>
+     *     <li>
+     *         If, prior to the unsubscribe operation, the application thread 
was interrupted, it's important that
+     *         the interrupt flag be preserved for the
+     *         {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener 
callback} to

Review Comment:
   Same as above... removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to