dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492583068


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owens any of the revoked partitions. If it does, we cannot 
progress.
+                for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+                    for (Integer partitionId : topicPartitions.partitions()) {
+                        boolean stillHasRevokedPartition = member
+                            .revokedPartitions()
+                            .getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+                            .contains(partitionId);
+                        if (stillHasRevokedPartition) {
+                            return member;
+                        }
+                    }
+                }
+
+                // When the member has revoked all the pending partitions, it 
can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,

Review Comment:
   Yes, it does in the code you extracted. Let's take an example:
   1) Member A has partitions 1 and 2 in epoch 10.
   2) Target assignment changes the assignment of A to 2 in epoch 11.
   3) Member A enters the UNREVOKED_PARTITIONS state to revoke 1 and stays in 
10.
   4) Target assignment changes the assignment of A to 3 in epoch 12 (based an 
HB from another member).
   5) A comes back to acknowledge that 1 is gone. Now A can transitions to 
epoch 11 because it has revoked the partitions presenting him to do so earlier. 
However, he cannot transition to epoch 12 yet because it has to revoke 2 first.
   
   If we don't do this, A would remain in epoch 10 while revoking 2 and the 
rebalance timeout would not be reset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to