lucasbru commented on code in PR #21495:
URL: https://github.com/apache/kafka/pull/21495#discussion_r2940482287


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1227,11 +1236,24 @@ private CompletableFuture<Void> assignPartitions(
         return result;
     }
 
+    /**
+     * Returns true if assignment changes can happen outside of 
consumer.poll().
+     * When false, the assignment update will be deferred to the application 
thread and applied
+     * during poll, ensuring that consumer.assignment() only changes within a 
call to consumer.poll().
+     *
+     * @return true if assignment changes can happen outside poll, false to 
defer to poll
+     */
+    protected abstract boolean allowAssignmentChangeOutsidePoll();

Review Comment:
   Seems like there is a connection between this and signalPartitionsAssigned. 
I wonder if it wouldn't be cleaner to combine the two (i.e. call 
updateSubscriptionAwaitingCallback from a function like 
signalPartitionsAssigned which is abstract and implemented differently for 
share vs. consumer.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -220,12 +227,51 @@ private void process(final ErrorEvent event) {
             throw event.error();
         }
 
-        private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
+        /**
+         * Processing this event will perform the actions needed in the app 
thread when new partitions are reconciled in the background:
+         * - apply assignment changes (ensuring they happen in the background 
but triggered within the app thread poll)
+         * - run onPartitionsAssigned callback if present
+         * - notify background thread so it can carry on (e.g., send ack to 
the broker)
+         */
+        private void process(final PartitionsAssignedEvent event) {
+
+            applyNewAssignment(event);
+
+            if (subscriptions.rebalanceListener().isEmpty()) {
+                event.future().complete(null);
+            } else {
+                
invokeRebalanceCallbackAndNotifyBackgroundThread(ON_PARTITIONS_ASSIGNED, 
event.addedPartitions(), event.future());
+            }
+        }
+
+        /**
+         * Send event to the background to update the assignment in the 
subscription state.
+         * BLock on it to complete to ensure the assignment change happens 
within a call to

Review Comment:
   ```suggestion
            * Block on it to complete to ensure the assignment change happens 
within a call to
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to