lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1457609049


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,22 @@ private CompletableFuture<Void> assignPartitions(
         // Make assignment effective on the client by updating the 
subscription state.
         updateSubscription(assignedPartitions, false);
 
+        // Pause partitions to ensure that fetch does not start until the 
callback completes.
+        assignedPartitions.forEach(tp -> 
subscriptions.pause(tp.topicPartition()));

Review Comment:
   Hey, I totally get it, pause is only preventing fetching but we also need to 
prevent initializing the offsets as they could change during the ongoing 
callback, I missed that. I'm still thinking about this, but I see 2 approaches 
for now:
   1. Introducing a new state for the newly added partitions, with changes in 
the logic to avoid fetch and initialization based on this state (not on pause). 
Partitions would be in this state while the callback executes, and then move 
into INITIALIALIZING as if they had been just added. This one is tricky and 
needs more thinking, it would change several core parts around the subscription 
state that it is used by many components, so hard to even start to think about 
the impact. 
   2. Continue using the pause mechanism to prevent fetching (not re-inventing 
the wheel for that), and change the `initWithCommittedOffsetsIfNeeded` for the 
new consumer only, simply to make sure that it initializes positions only for 
partitions that are INITIALIZING and not paused (conceptually this new state we 
have in the new consumer while the callback runs). That would probably achieve 
what we want, changing only the new consumer, which I like, since this whole 
situation exists only in it, due to its multi-thread app/background. Still 
thinking about potential unwanted impact of skipping initializing positions for 
paused partitions that are INITIALIZING...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to