lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1463584138


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,38 @@ private CompletableFuture<Void> assignPartitions(
         // Make assignment effective on the client by updating the 
subscription state.
         updateSubscription(assignedPartitions, false);
 
+        // Mark assigned partitions as pendingOnAssignedCallback to 
temporarily stop fetching or
+        // initializing positions for them. Passing the full set of assigned 
partitions
+        // (previously owned and newly added), given that they are all 
provided to the user in the
+        // callback, so we could expect offsets updates for any of them.
+        Set<TopicPartition> assignedTopicPartition = 
assignedPartitions.stream().map(tIdp -> 
tIdp.topicPartition()).collect(Collectors.toSet());
+        subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, 
true);
+
         // Invoke user call back.
         CompletableFuture<Void> result = 
invokeOnPartitionsAssignedCallback(addedPartitions);
 
+        // Resume partitions only if the callback succeeded.
+        result.whenComplete((error, callbackResult) -> {
+            if (error == null) {
+                // Remove pendingOnAssignedCallback flag from the assigned 
partitions, so we can
+                // start fetching, and updating positions for them if needed.
+                
subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false);
+            } else {
+                // Remove pendingOnAssignedCallback flag from the previously 
owned only so that

Review Comment:
   Yes, you got it right. When callback fails, the assignment is not acked to 
the broker, and it remains as `assignmentReadyToReconcile` on the client. So 
when we get the poll based reconciliation that assignment will be retried, 
until it succeeds or the broker removes it from the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to