lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
         }
 
         private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-            ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+            ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
                 rebalanceListenerInvoker,
                 event.methodName(),
                 event.partitions(),
                 event.future()
             );
             applicationEventHandler.add(invokedEvent);
+            if (invokedEvent.error().isPresent()) {
+                throw invokedEvent.error().get();

Review Comment:
   Looking at the reconciliation logic, I think if `onPartitionsRevoked` 
throws, we'll not execute `onPartitionsAssigned`. And the call to 
`onPartitionsLost` seems to be independent of reconciliation. So not sure how 
we'd end up with two exceptions.
   
   You are right that there is a behavioral difference around finishing the 
reconciliation. The old consumer throws _after_ finishing the reconciliation, 
while the new consumer throws on a different thread, so there is no strict time 
ordering between finishing the reconciliation and throwing. But I'm struggling 
to see how one can observe the difference. The reconciliation will have 
finished the next time the background thread processes any events, so in a 
sense, you cannot observe the difference based on the queue architecture. The 
difference may only be observable through shared state that breaks the 
queue-based architecture. SubscriptionState comes to mind here. Thinking of 
something like
   
   1. application thread enters poll, fails during rebalance listener execution 
and throws
   2. application thread somehow reads subscription state
   3. background thread updates subscription state as part of reconciliation
   
   Now the application thread has observed an "incomplete reconciliation". But 
after a listener execution has failed, we don't seem to update the subscription 
state in the reconciliation. 
   
   So in summary - not sure if we are going to notice the different behaviors?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to