rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -251,144 +260,52 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
         }
 
         if (!newPartitionsPendingRevocation.isEmpty()) {
-            // If the partition pending revocation set is not empty, we 
transition the
-            // member to revoking and keep the current epoch. The transition 
to the new
-            // state is done when the member is updated.
+            // If there are partitions to be revoked, the member remains in 
its current
+            // epoch and requests the revocation of those partitions. It 
transitions to
+            // the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new 
assignment is
+            // acknowledged.
             return new ConsumerGroupMember.Builder(member)
+                
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
                 .setAssignedPartitions(newAssignedPartitions)
-                .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
-                .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-                .setTargetMemberEpoch(targetAssignmentEpoch)
+                .setRevokedPartitions(newPartitionsPendingRevocation)
                 .build();
-        } else {
-            if (!newPartitionsPendingAssignment.isEmpty()) {
-                // If the partitions pending assignment set is not empty, we 
check
-                // if some or all partitions are free to use. If they are, we 
move
-                // them to the partitions assigned set.
-                maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-            }
-
-            // We transition to the target epoch. If the partitions pending 
assignment
-            // set is empty, the member transition to stable, otherwise to 
assigning.
-            // The transition to the new state is done when the member is 
updated.
+        } else if (!newPartitionsPendingAssignment.isEmpty()) {
+            // If there are partitions to be assigned, the member transitions 
to the
+            // target epoch and requests the assignment of those partitions. 
Note that
+            // the partitions are directly added to the assigned partitions 
set. The
+            // member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to 
wait until
+            // the new assignment is acknowledged.
+            newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
+                newAssignedPartitions
+                    .computeIfAbsent(topicId, __ -> new HashSet<>())
+                    .addAll(partitions);
+            });
             return new ConsumerGroupMember.Builder(member)
+                
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
+                .updateMemberEpoch(targetAssignmentEpoch)
                 .setAssignedPartitions(newAssignedPartitions)
-                .setPartitionsPendingRevocation(Collections.emptyMap())
-                .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-                .setPreviousMemberEpoch(member.memberEpoch())
-                .setMemberEpoch(targetAssignmentEpoch)
-                .setTargetMemberEpoch(targetAssignmentEpoch)
+                .setRevokedPartitions(Collections.emptyMap())
                 .build();
-        }
-    }
-
-    /**
-     * Tries to transition from Revoke to Assigning or Stable. This is only
-     * possible when the member acknowledges that it only owns the partition
-     * in the assigned partitions.
-     *
-     * @return A new ConsumerGroupMember with the new state or the current one
-     *         if the member stays in the current state.
-     */
-    private ConsumerGroupMember 
maybeTransitionFromRevokingToAssigningOrStable() {
-        if (member.partitionsPendingRevocation().isEmpty() || 
matchesAssignedPartitions(ownedTopicPartitions)) {
-            Map<Uuid, Set<Integer>> newAssignedPartitions = 
deepCopy(member.assignedPartitions());
-            Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = 
deepCopy(member.partitionsPendingAssignment());
-
-            if (!newPartitionsPendingAssignment.isEmpty()) {
-                // If the partitions pending assignment set is not empty, we 
check
-                // if some or all partitions are free to use. If they are, we 
move
-                // them to the assigned set.
-                maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-            }
-
-            // We transition to the target epoch. If the partitions pending 
assignment
-            // set is empty, the member transition to stable, otherwise to 
assigning.
-            // The transition to the new state is done when the member is 
updated.
+        } else if (hasUnreleasedPartitions) {
+            // If there are no partitions to be revoked nor to be assigned but 
some

Review Comment:
   Please do correct me if I'm wrong but I was under the impression that 
unreleased partitions were those that had to be revoked from older members to 
be assigned to another member, what is the situation when there's no partitions 
to be revoked or assigned but they're not available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to