kirktrue commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1823645974
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1813,36 +1832,64 @@ private boolean processBackgroundEvents() { * execution of the rebalancing logic. The rebalancing logic cannot complete until the * {@link ConsumerRebalanceListener} callback is performed. * - * @param future Event that contains a {@link CompletableFuture}; it is on this future that the - * application thread will wait for completion - * @param timer Overall timer that bounds how long to wait for the event to complete - * @param ignoreErrorEventException Predicate to ignore background errors. - * Any exceptions found while processing background events that match the predicate won't be propagated. - * @return {@code true} if the event completed within the timeout, {@code false} otherwise + * <p/> + * + * There is a conflict between the needs of the {@link ConsumerRebalanceListener} and internal event processing + * when it comes to handling the current thread's interrupt state. To maintain compatibility with the + * {@link ClassicKafkaConsumer}'s handling of rebalance listeners, the interrupt state for the current thread + * will be preserved when invoking callbacks. However, because of the internal use of {@link Future#get()} to + * wait for event responses, the current thread cannot exist in an interrupted state. The flag is cleared before + * handling events so that calls to {@link Future#get()} do not immediately throw {@link TimeoutException}s. + * This method will conditionally set the current thread's interrupted flag prior to processing background events + * so that if there are any rebalance listeners, the interrupt state will be preserved. Immediately after + * processing the background events, the thread's interrupted flag is cleared. + * + * @param future Future from {@link UnsubscribeEvent} + * @param wasInterrupted {@code true} if the current thread was previously interrupted, {@code false} otherwise + * @param timer Timer which constrains the runtime of the operation */ - // Visible for testing - <T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) { + void waitForUnsubscribe(final CompletableFuture<?> future, final boolean wasInterrupted, final Timer timer) { + // At this point, the unsubscribe process is on its way. The application thread has no direct way of knowing + // where the background thread is in its journey of unsubscribing, hence this loop... do { + // Depending on a number of variables (described in the method comments), a + // ConsumerRebalanceListenerCallbackNeededEvent may or may not appear in the background event queue. + // So there's really no choice but to process any events in the queue in case that event is waiting for + // the application thread to pick up and invoke the callback handler. boolean hadEvents = false; + try { - hadEvents = processBackgroundEvents(); - } catch (Exception e) { - if (!ignoreErrorEventException.test(e)) + if (wasInterrupted) + Thread.currentThread().interrupt(); + + try { + hadEvents = processBackgroundEvents(); + } catch (InvalidTopicException e) { + // If users subscribe to an invalid topic name, they will get InvalidTopicException in error events, + // because network thread keeps trying to send MetadataRequest in the background. + // Ignore it to avoid unsubscribe failed. + } catch (Exception e) { Review Comment: Yes. Thanks for catching that. I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org