rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -251,144 +260,52 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { - // If the partition pending revocation set is not empty, we transition the - // member to revoking and keep the current epoch. The transition to the new - // state is done when the member is updated. + // If there are partitions to be revoked, the member remains in its current + // epoch and requests the revocation of those partitions. It transitions to + // the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new assignment is + // acknowledged. return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) .setAssignedPartitions(newAssignedPartitions) - .setPartitionsPendingRevocation(newPartitionsPendingRevocation) - .setPartitionsPendingAssignment(newPartitionsPendingAssignment) - .setTargetMemberEpoch(targetAssignmentEpoch) + .setRevokedPartitions(newPartitionsPendingRevocation) .build(); - } else { - if (!newPartitionsPendingAssignment.isEmpty()) { - // If the partitions pending assignment set is not empty, we check - // if some or all partitions are free to use. If they are, we move - // them to the partitions assigned set. - maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); - } - - // We transition to the target epoch. If the partitions pending assignment - // set is empty, the member transition to stable, otherwise to assigning. - // The transition to the new state is done when the member is updated. + } else if (!newPartitionsPendingAssignment.isEmpty()) { + // If there are partitions to be assigned, the member transitions to the + // target epoch and requests the assignment of those partitions. Note that + // the partitions are directly added to the assigned partitions set. The + // member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to wait until + // the new assignment is acknowledged. + newPartitionsPendingAssignment.forEach((topicId, partitions) -> { + newAssignedPartitions + .computeIfAbsent(topicId, __ -> new HashSet<>()) + .addAll(partitions); + }); return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) + .updateMemberEpoch(targetAssignmentEpoch) .setAssignedPartitions(newAssignedPartitions) - .setPartitionsPendingRevocation(Collections.emptyMap()) - .setPartitionsPendingAssignment(newPartitionsPendingAssignment) - .setPreviousMemberEpoch(member.memberEpoch()) - .setMemberEpoch(targetAssignmentEpoch) - .setTargetMemberEpoch(targetAssignmentEpoch) + .setRevokedPartitions(Collections.emptyMap()) .build(); - } - } - - /** - * Tries to transition from Revoke to Assigning or Stable. This is only - * possible when the member acknowledges that it only owns the partition - * in the assigned partitions. - * - * @return A new ConsumerGroupMember with the new state or the current one - * if the member stays in the current state. - */ - private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { - if (member.partitionsPendingRevocation().isEmpty() || matchesAssignedPartitions(ownedTopicPartitions)) { - Map<Uuid, Set<Integer>> newAssignedPartitions = deepCopy(member.assignedPartitions()); - Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); - - if (!newPartitionsPendingAssignment.isEmpty()) { - // If the partitions pending assignment set is not empty, we check - // if some or all partitions are free to use. If they are, we move - // them to the assigned set. - maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); - } - - // We transition to the target epoch. If the partitions pending assignment - // set is empty, the member transition to stable, otherwise to assigning. - // The transition to the new state is done when the member is updated. + } else if (hasUnreleasedPartitions) { + // If there are no partitions to be revoked nor to be assigned but some Review Comment: Please do correct me if I'm wrong but I was under the impression that unreleased partitions were those that had to be revoked from older members to be assigned to another member, what is the situation when there's no partitions to be revoked or assigned but they're not available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org