jeffkbkim commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1491825284
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ########## @@ -346,10 +346,9 @@ public static Record newCurrentAssignmentRecord( new ConsumerGroupCurrentMemberAssignmentValue() .setMemberEpoch(member.memberEpoch()) .setPreviousMemberEpoch(member.previousMemberEpoch()) - .setTargetMemberEpoch(member.targetMemberEpoch()) + .setState(member.state().value()) Review Comment: was the target member epoch only used to determine the state? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -515,38 +457,24 @@ public MemberState state() { } /** - * @return The set of assigned partitions. - */ - public Map<Uuid, Set<Integer>> assignedPartitions() { - return assignedPartitions; - } - - /** - * @return The set of partitions awaiting revocation from the member. + * @return True of the member is in the Stable state and at the desired epoch. Review Comment: nit: True if ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1211,13 +1192,99 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { + if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } + /** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param member The member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ + private ConsumerGroupMember maybeReconcile( + String groupId, + ConsumerGroupMember member, + BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, + int targetAssignmentEpoch, + Assignment targetAssignment, + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, + List<Record> records + ) { + if (member.isReconciledTo(targetAssignmentEpoch)) { + return member; + } + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withCurrentPartitionEpoch(currentPartitionEpoch) + .withOwnedTopicPartitions(ownedTopicPartitions) + .build(); + + if (!updatedMember.equals(member)) { + records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + + log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", + groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), + formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { + cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); + } + } + + return updatedMember; + } + + private String formatAssignment( Review Comment: example ``` [10ZNcX_URLSCRf27-VKXsw-0, 10ZNcX_URLSCRf27-VKXsw-1, 10ZNcX_URLSCRf27-VKXsw-2, NJCREtlUT8yCMZGjkcTJQQ-0, NJCREtlUT8yCMZGjkcTJQQ-1, NJCREtlUT8yCMZGjkcTJQQ-2, NJCREtlUT8yCMZGjkcTJQQ-3, NJCREtlUT8yCMZGjkcTJQQ-4, NJCREtlUT8yCMZGjkcTJQQ-5] ``` the UUIDs don't look too readable ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -369,20 +329,12 @@ public String toString() { /** * The partitions being revoked by this member. */ - private final Map<Uuid, Set<Integer>> partitionsPendingRevocation; - - /** - * The partitions waiting to be assigned to this - * member. They will be assigned when they are - * released by their previous owners. - */ - private final Map<Uuid, Set<Integer>> partitionsPendingAssignment; + private final Map<Uuid, Set<Integer>> revokedPartitions; Review Comment: did the definition not change? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { - // A new target assignment has been installed, we need to restart - // the reconciliation loop from the beginning. - if (targetAssignmentEpoch != member.targetMemberEpoch()) { - return transitionToNewTargetAssignmentState(); - } - switch (member.state()) { - // Check if the partitions have been revoked by the member. - case REVOKING: - return maybeTransitionFromRevokingToAssigningOrStable(); + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + } else { + return member; + } - // Check if pending partitions have been freed up. - case ASSIGNING: - return maybeTransitionFromAssigningToAssigningOrStable(); + case UNREVOKED_PARTITIONS: + // When the member is in the UNREVOKED_PARTITIONS state, we wait + // until the member has revoked the necessary partitions. They are + // considered revoked when they are not anymore reported in the + // owned partitions set in the ConsumerGroupHeartbeat API. - // Nothing to do. - case STABLE: - return member; + // If the member does not provide its owned partitions. We cannot + // progress. + if (ownedTopicPartitions == null) { + return member; + } + + // If the member provides its owned partitions. We verify if it still + // owens any of the revoked partitions. If it does, we cannot progress. Review Comment: nit: owns ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -779,7 +779,7 @@ private void maybeUpdateGroupState() { newState = ASSIGNING; } else { for (ConsumerGroupMember member : members.values()) { - if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) { + if (!member.isReconciledTo(targetAssignmentEpoch.get())) { Review Comment: isReconciledTo performs `memberEpoch == targetAssignmentEpoch` whereas previously we checked `member.targetMemberEpoch() != targetAssignmentEpoch.get()` here. did the meaning of memberEpoch change? i'm confused a bit on what the difference was between memberEpoch and targetMemberEpoch ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { - // A new target assignment has been installed, we need to restart - // the reconciliation loop from the beginning. - if (targetAssignmentEpoch != member.targetMemberEpoch()) { - return transitionToNewTargetAssignmentState(); - } - switch (member.state()) { - // Check if the partitions have been revoked by the member. - case REVOKING: - return maybeTransitionFromRevokingToAssigningOrStable(); + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + } else { + return member; + } - // Check if pending partitions have been freed up. - case ASSIGNING: - return maybeTransitionFromAssigningToAssigningOrStable(); + case UNREVOKED_PARTITIONS: + // When the member is in the UNREVOKED_PARTITIONS state, we wait + // until the member has revoked the necessary partitions. They are + // considered revoked when they are not anymore reported in the + // owned partitions set in the ConsumerGroupHeartbeat API. - // Nothing to do. - case STABLE: - return member; + // If the member does not provide its owned partitions. We cannot + // progress. + if (ownedTopicPartitions == null) { + return member; + } + + // If the member provides its owned partitions. We verify if it still + // owens any of the revoked partitions. If it does, we cannot progress. + for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { + for (Integer partitionId : topicPartitions.partitions()) { + boolean stillHasRevokedPartition = member + .revokedPartitions() + .getOrDefault(topicPartitions.topicId(), Collections.emptySet()) + .contains(partitionId); + if (stillHasRevokedPartition) { + return member; + } + } + } + + // When the member has revoked all the pending partitions, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, Review Comment: does this argument actually take an effect? in `computeNextAssignment()` i only see the memberEpoch being used here: ``` if (!newPartitionsPendingRevocation.isEmpty()) { // If there are partitions to be revoked, the member remains in its current // epoch and requests the revocation of those partitions. It transitions to // the UNREVOKED_PARTITIONS state to wait until the client acknowledges the // revocation of the partitions. return new ConsumerGroupMember.Builder(member) .setState(MemberState.UNREVOKED_PARTITIONS) .updateMemberEpoch(memberEpoch) .setAssignedPartitions(newAssignedPartitions) .setRevokedPartitions(newPartitionsPendingRevocation) .build(); ``` which doesn't align with the above comments. computeNextAssignment() seems to just bump the member epoch to `targetAssignmentEpoch` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1211,13 +1192,99 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { + if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } + /** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param member The member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ + private ConsumerGroupMember maybeReconcile( + String groupId, + ConsumerGroupMember member, + BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, + int targetAssignmentEpoch, + Assignment targetAssignment, + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, + List<Record> records + ) { + if (member.isReconciledTo(targetAssignmentEpoch)) { + return member; + } + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withCurrentPartitionEpoch(currentPartitionEpoch) + .withOwnedTopicPartitions(ownedTopicPartitions) + .build(); + + if (!updatedMember.equals(member)) { Review Comment: how come a reference check here is no longer sufficient? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1211,13 +1192,99 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { + if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { Review Comment: i think it would be good to look into the performance of hasAssignedPartitionsChanged, given that it's called for every ConsumerGroupHeartbeatRequest ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1211,13 +1192,99 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { + if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } + /** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param member The member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ + private ConsumerGroupMember maybeReconcile( + String groupId, + ConsumerGroupMember member, + BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, + int targetAssignmentEpoch, + Assignment targetAssignment, + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, + List<Record> records + ) { + if (member.isReconciledTo(targetAssignmentEpoch)) { + return member; + } + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withCurrentPartitionEpoch(currentPartitionEpoch) + .withOwnedTopicPartitions(ownedTopicPartitions) + .build(); + + if (!updatedMember.equals(member)) { + records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + + log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", + groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), + formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { Review Comment: i think it's worth mentioning that both UNRELEASED_PARTITIONS and STABLE states indicate that the rebalance is complete here ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -251,195 +255,51 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { - // If the partition pending revocation set is not empty, we transition the - // member to revoking and keep the current epoch. The transition to the new - // state is done when the member is updated. + // If there are partitions to be revoked, the member remains in its current + // epoch and requests the revocation of those partitions. It transitions to + // the UNREVOKED_PARTITIONS state to wait until the client acknowledges the Review Comment: "acknowledges the revocation of the partitions" -> this acknowledgement is done by removing these partitions from `ownedPartitions` right? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -251,195 +255,51 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { - // If the partition pending revocation set is not empty, we transition the - // member to revoking and keep the current epoch. The transition to the new - // state is done when the member is updated. + // If there are partitions to be revoked, the member remains in its current + // epoch and requests the revocation of those partitions. It transitions to + // the UNREVOKED_PARTITIONS state to wait until the client acknowledges the + // revocation of the partitions. return new ConsumerGroupMember.Builder(member) + .setState(MemberState.UNREVOKED_PARTITIONS) + .updateMemberEpoch(memberEpoch) .setAssignedPartitions(newAssignedPartitions) - .setPartitionsPendingRevocation(newPartitionsPendingRevocation) - .setPartitionsPendingAssignment(newPartitionsPendingAssignment) - .setTargetMemberEpoch(targetAssignmentEpoch) + .setRevokedPartitions(newPartitionsPendingRevocation) Review Comment: so it seems that the definition has not changed. to me, the old name seemed more straightforward. revokedPartitions is vague between "partitions that have been revoked" and "partitions that should be revoked" ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { - // A new target assignment has been installed, we need to restart - // the reconciliation loop from the beginning. - if (targetAssignmentEpoch != member.targetMemberEpoch()) { - return transitionToNewTargetAssignmentState(); - } - switch (member.state()) { - // Check if the partitions have been revoked by the member. - case REVOKING: - return maybeTransitionFromRevokingToAssigningOrStable(); + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + } else { + return member; + } - // Check if pending partitions have been freed up. - case ASSIGNING: - return maybeTransitionFromAssigningToAssigningOrStable(); + case UNREVOKED_PARTITIONS: + // When the member is in the UNREVOKED_PARTITIONS state, we wait + // until the member has revoked the necessary partitions. They are + // considered revoked when they are not anymore reported in the + // owned partitions set in the ConsumerGroupHeartbeat API. - // Nothing to do. - case STABLE: - return member; + // If the member does not provide its owned partitions. We cannot + // progress. + if (ownedTopicPartitions == null) { + return member; + } + + // If the member provides its owned partitions. We verify if it still + // owens any of the revoked partitions. If it does, we cannot progress. + for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { + for (Integer partitionId : topicPartitions.partitions()) { + boolean stillHasRevokedPartition = member + .revokedPartitions() + .getOrDefault(topicPartitions.topicId(), Collections.emptySet()) + .contains(partitionId); + if (stillHasRevokedPartition) { + return member; + } + } + } + + // When the member has revoked all the pending partitions, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, + member.assignedPartitions() + ); + + case UNRELEASED_PARTITIONS: + // When the member is in the UNRELEASED_PARTITIONS, we reconcile the + // member towards the latest target assignment. This will assign any + // of the unreleased partitions when they become available. + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + + case UNKNOWN: + // We could only end up in this state if a new state is added in the + // future and the group coordinator is downgraded. In this case, the + // best option is to fence the member to force it to rejoin the group + // without any partitions and to reconcile it again from scratch. + if (ownedTopicPartitions == null || !ownedTopicPartitions.isEmpty()) { + throw new FencedMemberEpochException("The consumer group member is in a unknown state. " + + "The member must abandon all its partitions and rejoin."); + } + + return computeNextAssignment( + targetAssignmentEpoch, + member.assignedPartitions() + ); } return member; } /** - * Transitions to NewTargetAssignment state. This is a transient state where - * we compute the assigned partitions, the partitions pending revocation, - * the partitions pending assignment, and transition to the next state. + * Computes the next assignment. * * @return A new ConsumerGroupMember. */ - private ConsumerGroupMember transitionToNewTargetAssignmentState() { + private ConsumerGroupMember computeNextAssignment( + int memberEpoch, + Map<Uuid, Set<Integer>> memberAssignedPartitions + ) { + boolean hasUnreleasedPartitions = false; Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>(); Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new HashMap<>(); Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new HashMap<>(); - // Compute the combined set of topics. Set<Uuid> allTopicIds = new HashSet<>(targetAssignment.partitions().keySet()); - allTopicIds.addAll(member.assignedPartitions().keySet()); - allTopicIds.addAll(member.partitionsPendingRevocation().keySet()); - allTopicIds.addAll(member.partitionsPendingAssignment().keySet()); + allTopicIds.addAll(memberAssignedPartitions.keySet()); for (Uuid topicId : allTopicIds) { Set<Integer> target = targetAssignment.partitions() .getOrDefault(topicId, Collections.emptySet()); - Set<Integer> currentAssignedPartitions = member.assignedPartitions() - .getOrDefault(topicId, Collections.emptySet()); - Set<Integer> currentRevokingPartitions = member.partitionsPendingRevocation() + Set<Integer> currentAssignedPartitions = memberAssignedPartitions .getOrDefault(topicId, Collections.emptySet()); - // Assigned_1 = (Assigned_0 + Pending_Revocation_0) ∩ Target - // Assigned_0 + Pending_Revocation_0 is used here because the partitions - // being revoked are still owned until the revocation is acknowledged. + // Assigned_1 = Assigned_0 ∩ Target Review Comment: i think it would be good to define what Assigned_1 and Assigned_0 represents. or use Assigned_new Assigned_old -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org