lucasbru commented on code in PR #21495:
URL: https://github.com/apache/kafka/pull/21495#discussion_r2940482287
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1227,11 +1236,24 @@ private CompletableFuture<Void> assignPartitions(
return result;
}
+ /**
+ * Returns true if assignment changes can happen outside of
consumer.poll().
+ * When false, the assignment update will be deferred to the application
thread and applied
+ * during poll, ensuring that consumer.assignment() only changes within a
call to consumer.poll().
+ *
+ * @return true if assignment changes can happen outside poll, false to
defer to poll
+ */
+ protected abstract boolean allowAssignmentChangeOutsidePoll();
Review Comment:
Seems like there is a connection between this and signalPartitionsAssigned.
I wonder if it wouldn't be cleaner to combine the two (i.e. call
updateSubscriptionAwaitingCallback from a function like
signalPartitionsAssigned which is abstract and implemented differently for
share vs. consumer.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -220,12 +227,51 @@ private void process(final ErrorEvent event) {
throw event.error();
}
- private void process(final
ConsumerRebalanceListenerCallbackNeededEvent event) {
+ /**
+ * Processing this event will perform the actions needed in the app
thread when new partitions are reconciled in the background:
+ * - apply assignment changes (ensuring they happen in the background
but triggered within the app thread poll)
+ * - run onPartitionsAssigned callback if present
+ * - notify background thread so it can carry on (e.g., send ack to
the broker)
+ */
+ private void process(final PartitionsAssignedEvent event) {
+
+ applyNewAssignment(event);
+
+ if (subscriptions.rebalanceListener().isEmpty()) {
+ event.future().complete(null);
+ } else {
+
invokeRebalanceCallbackAndNotifyBackgroundThread(ON_PARTITIONS_ASSIGNED,
event.addedPartitions(), event.future());
+ }
+ }
+
+ /**
+ * Send event to the background to update the assignment in the
subscription state.
+ * BLock on it to complete to ensure the assignment change happens
within a call to
Review Comment:
```suggestion
* Block on it to complete to ensure the assignment change happens
within a call to
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]