kirktrue commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1823645497
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1781,30 +1776,54 @@ private boolean processBackgroundEvents() { } /** - * This method can be used by cases where the caller has an event that needs to both block for completion but - * also process background events. For some events, in order to fully process the associated logic, the - * {@link ConsumerNetworkThread background thread} needs assistance from the application thread to complete. - * If the application thread simply blocked on the event after submitting it, the processing would deadlock. - * The logic herein is basically a loop that performs two tasks in each iteration: - * - * <ol> - * <li>Process background events, if any</li> - * <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an event} to complete</li> - * </ol> + * When unsubscribing, the application thread enqueues an {@link UnsubscribeEvent} on the application event queue. + * That event will eventually trigger the rebalancing logic in the background thread. + * Critically, as part of this rebalancing work, any + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener callback} may need to be + * invoked. * * <p/> * - * Each iteration gives the application thread an opportunity to process background events, which may be - * necessary to complete the overall processing. + * There are a number of challenges that arise during this seemingly simple process: * - * <p/> + * <ul> + * <li> + * While most of the unsubscribe logic is performed on the background thread, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener callback} must be + * executed on the application thread. This requires a delicate dance between the two threads that is + * orchestrated via the {@link ConsumerRebalanceListenerCallbackNeededEvent} and + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} events. + * </li> + * <li> + * The user may or may not be using a callback handler. This can be deduced by + * {@link SubscriptionState#rebalanceListener() checking for rebalance listener}. + * </li> + * <li> + * If, at the time of unsubscribe, the consumer does not have any partitions assigned, the background + * thread will <em>not</em> enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to signal the + * application thread to execute the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection) rebalance listener callback}. It's + * technically possible to + * {@link SubscriptionState#assignedPartitions() check if there are any partitions assigned} in the Review Comment: Same as above... I updated it to avoid that usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org