kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1823645974


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1813,36 +1832,64 @@ private boolean processBackgroundEvents() {
      * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
      * {@link ConsumerRebalanceListener} callback is performed.
      *
-     * @param future                    Event that contains a {@link 
CompletableFuture}; it is on this future that the
-     *                                  application thread will wait for 
completion
-     * @param timer                     Overall timer that bounds how long to 
wait for the event to complete
-     * @param ignoreErrorEventException Predicate to ignore background errors.
-     *                                  Any exceptions found while processing 
background events that match the predicate won't be propagated.
-     * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+     * <p/>
+     *
+     * There is a conflict between the needs of the {@link 
ConsumerRebalanceListener} and internal event processing
+     * when it comes to handling the current thread's interrupt state. To 
maintain compatibility with the
+     * {@link ClassicKafkaConsumer}'s handling of rebalance listeners, the 
interrupt state for the current thread
+     * will be preserved when invoking callbacks. However, because of the 
internal use of {@link Future#get()} to
+     * wait for event responses, the current thread cannot exist in an 
interrupted state. The flag is cleared before
+     * handling events so that calls to {@link Future#get()} do not 
immediately throw {@link TimeoutException}s.
+     * This method will conditionally set the current thread's interrupted 
flag prior to processing background events
+     * so that if there are any rebalance listeners, the interrupt state will 
be preserved. Immediately after
+     * processing the background events, the thread's interrupted flag is 
cleared.
+     *
+     * @param future Future from {@link UnsubscribeEvent}
+     * @param wasInterrupted {@code true} if the current thread was previously 
interrupted, {@code false} otherwise
+     * @param timer Timer which constrains the runtime of the operation
      */
-    // Visible for testing
-    <T> T processBackgroundEvents(Future<T> future, Timer timer, 
Predicate<Exception> ignoreErrorEventException) {
+    void waitForUnsubscribe(final CompletableFuture<?> future, final boolean 
wasInterrupted, final Timer timer) {
+        // At this point, the unsubscribe process is on its way. The 
application thread has no direct way of knowing
+        // where the background thread is in its journey of unsubscribing, 
hence this loop...
         do {
+            // Depending on a number of variables (described in the method 
comments), a
+            // ConsumerRebalanceListenerCallbackNeededEvent may or may not 
appear in the background event queue.
+            // So there's really no choice but to process any events in the 
queue in case that event is waiting for
+            // the application thread to pick up and invoke the callback 
handler.
             boolean hadEvents = false;
+
             try {
-                hadEvents = processBackgroundEvents();
-            } catch (Exception e) {
-                if (!ignoreErrorEventException.test(e))
+                if (wasInterrupted)
+                    Thread.currentThread().interrupt();
+
+                try {
+                    hadEvents = processBackgroundEvents();
+                } catch (InvalidTopicException e) {
+                    // If users subscribe to an invalid topic name, they will 
get InvalidTopicException in error events,
+                    // because network thread keeps trying to send 
MetadataRequest in the background.
+                    // Ignore it to avoid unsubscribe failed.
+                } catch (Exception e) {

Review Comment:
   Yes. Thanks for catching that. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to