jeffkbkim commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1491825284


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -346,10 +346,9 @@ public static Record newCurrentAssignmentRecord(
                 new ConsumerGroupCurrentMemberAssignmentValue()
                     .setMemberEpoch(member.memberEpoch())
                     .setPreviousMemberEpoch(member.previousMemberEpoch())
-                    .setTargetMemberEpoch(member.targetMemberEpoch())
+                    .setState(member.state().value())

Review Comment:
   was the target member epoch only used to determine the state?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -515,38 +457,24 @@ public MemberState state() {
     }
 
     /**
-     * @return The set of assigned partitions.
-     */
-    public Map<Uuid, Set<Integer>> assignedPartitions() {
-        return assignedPartitions;
-    }
-
-    /**
-     * @return The set of partitions awaiting revocation from the member.
+     * @return True of the member is in the Stable state and at the desired 
epoch.

Review Comment:
   nit: True if



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {
+                cancelConsumerGroupRebalanceTimeout(groupId, 
updatedMember.memberId());
+            }
+        }
+
+        return updatedMember;
+    }
+
+    private String formatAssignment(

Review Comment:
   example
   ```
   [10ZNcX_URLSCRf27-VKXsw-0, 10ZNcX_URLSCRf27-VKXsw-1, 
10ZNcX_URLSCRf27-VKXsw-2, NJCREtlUT8yCMZGjkcTJQQ-0, NJCREtlUT8yCMZGjkcTJQQ-1, 
NJCREtlUT8yCMZGjkcTJQQ-2, NJCREtlUT8yCMZGjkcTJQQ-3, NJCREtlUT8yCMZGjkcTJQQ-4, 
NJCREtlUT8yCMZGjkcTJQQ-5]
   ```
   the UUIDs don't look too readable



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -369,20 +329,12 @@ public String toString() {
     /**
      * The partitions being revoked by this member.
      */
-    private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
-
-    /**
-     * The partitions waiting to be assigned to this
-     * member. They will be assigned when they are
-     * released by their previous owners.
-     */
-    private final Map<Uuid, Set<Integer>> partitionsPendingAssignment;
+    private final Map<Uuid, Set<Integer>> revokedPartitions;

Review Comment:
   did the definition not change?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owens any of the revoked partitions. If it does, we cannot 
progress.

Review Comment:
   nit: owns



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -779,7 +779,7 @@ private void maybeUpdateGroupState() {
             newState = ASSIGNING;
         } else {
             for (ConsumerGroupMember member : members.values()) {
-                if (member.targetMemberEpoch() != targetAssignmentEpoch.get() 
|| member.state() != ConsumerGroupMember.MemberState.STABLE) {
+                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {

Review Comment:
   isReconciledTo performs `memberEpoch == targetAssignmentEpoch` whereas 
previously we checked `member.targetMemberEpoch() != 
targetAssignmentEpoch.get()` here. did the meaning of memberEpoch change? i'm 
confused a bit on what the difference was between memberEpoch and 
targetMemberEpoch



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owens any of the revoked partitions. If it does, we cannot 
progress.
+                for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+                    for (Integer partitionId : topicPartitions.partitions()) {
+                        boolean stillHasRevokedPartition = member
+                            .revokedPartitions()
+                            .getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+                            .contains(partitionId);
+                        if (stillHasRevokedPartition) {
+                            return member;
+                        }
+                    }
+                }
+
+                // When the member has revoked all the pending partitions, it 
can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,

Review Comment:
   does this argument actually take an effect?
   
   in `computeNextAssignment()` i only see the memberEpoch being used here:
   ```
           if (!newPartitionsPendingRevocation.isEmpty()) {
               // If there are partitions to be revoked, the member remains in 
its current
               // epoch and requests the revocation of those partitions. It 
transitions to
               // the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the
               // revocation of the partitions.
               return new ConsumerGroupMember.Builder(member)
                   .setState(MemberState.UNREVOKED_PARTITIONS)
                   .updateMemberEpoch(memberEpoch)
                   .setAssignedPartitions(newAssignedPartitions)
                   .setRevokedPartitions(newPartitionsPendingRevocation)
                   .build();
   ```
   
   which doesn't align with the above comments. computeNextAssignment() seems 
to just bump the member epoch to `targetAssignmentEpoch`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {

Review Comment:
   how come a reference check here is no longer sufficient?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {

Review Comment:
   i think it would be good to look into the performance of 
hasAssignedPartitionsChanged, given that it's called for every 
ConsumerGroupHeartbeatRequest



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {

Review Comment:
   i think it's worth mentioning that both UNRELEASED_PARTITIONS and STABLE 
states indicate that the rebalance is complete here



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -251,195 +255,51 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
         }
 
         if (!newPartitionsPendingRevocation.isEmpty()) {
-            // If the partition pending revocation set is not empty, we 
transition the
-            // member to revoking and keep the current epoch. The transition 
to the new
-            // state is done when the member is updated.
+            // If there are partitions to be revoked, the member remains in 
its current
+            // epoch and requests the revocation of those partitions. It 
transitions to
+            // the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the

Review Comment:
   "acknowledges the revocation of the partitions" -> this acknowledgement is 
done by removing these partitions from `ownedPartitions` right?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -251,195 +255,51 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
         }
 
         if (!newPartitionsPendingRevocation.isEmpty()) {
-            // If the partition pending revocation set is not empty, we 
transition the
-            // member to revoking and keep the current epoch. The transition 
to the new
-            // state is done when the member is updated.
+            // If there are partitions to be revoked, the member remains in 
its current
+            // epoch and requests the revocation of those partitions. It 
transitions to
+            // the UNREVOKED_PARTITIONS state to wait until the client 
acknowledges the
+            // revocation of the partitions.
             return new ConsumerGroupMember.Builder(member)
+                .setState(MemberState.UNREVOKED_PARTITIONS)
+                .updateMemberEpoch(memberEpoch)
                 .setAssignedPartitions(newAssignedPartitions)
-                .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
-                .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-                .setTargetMemberEpoch(targetAssignmentEpoch)
+                .setRevokedPartitions(newPartitionsPendingRevocation)

Review Comment:
   so it seems that the definition has not changed. to me, the old name seemed 
more straightforward.
   
   revokedPartitions is vague between "partitions that have been revoked" and 
"partitions that should be revoked"



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owens any of the revoked partitions. If it does, we cannot 
progress.
+                for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+                    for (Integer partitionId : topicPartitions.partitions()) {
+                        boolean stillHasRevokedPartition = member
+                            .revokedPartitions()
+                            .getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+                            .contains(partitionId);
+                        if (stillHasRevokedPartition) {
+                            return member;
+                        }
+                    }
+                }
+
+                // When the member has revoked all the pending partitions, it 
can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,
+                    member.assignedPartitions()
+                );
+
+            case UNRELEASED_PARTITIONS:
+                // When the member is in the UNRELEASED_PARTITIONS, we 
reconcile the
+                // member towards the latest target assignment. This will 
assign any
+                // of the unreleased partitions when they become available.
+                return computeNextAssignment(
+                    member.memberEpoch(),
+                    member.assignedPartitions()
+                );
+
+            case UNKNOWN:
+                // We could only end up in this state if a new state is added 
in the
+                // future and the group coordinator is downgraded. In this 
case, the
+                // best option is to fence the member to force it to rejoin 
the group
+                // without any partitions and to reconcile it again from 
scratch.
+                if (ownedTopicPartitions == null || 
!ownedTopicPartitions.isEmpty()) {
+                    throw new FencedMemberEpochException("The consumer group 
member is in a unknown state. "
+                        + "The member must abandon all its partitions and 
rejoin.");
+                }
+
+                return computeNextAssignment(
+                    targetAssignmentEpoch,
+                    member.assignedPartitions()
+                );
         }
 
         return member;
     }
 
     /**
-     * Transitions to NewTargetAssignment state. This is a transient state 
where
-     * we compute the assigned partitions, the partitions pending revocation,
-     * the partitions pending assignment, and transition to the next state.
+     * Computes the next assignment.
      *
      * @return A new ConsumerGroupMember.
      */
-    private ConsumerGroupMember transitionToNewTargetAssignmentState() {
+    private ConsumerGroupMember computeNextAssignment(
+        int memberEpoch,
+        Map<Uuid, Set<Integer>> memberAssignedPartitions
+    ) {
+        boolean hasUnreleasedPartitions = false;
         Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
         Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new 
HashMap<>();
         Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new 
HashMap<>();
 
-        // Compute the combined set of topics.
         Set<Uuid> allTopicIds = new 
HashSet<>(targetAssignment.partitions().keySet());
-        allTopicIds.addAll(member.assignedPartitions().keySet());
-        allTopicIds.addAll(member.partitionsPendingRevocation().keySet());
-        allTopicIds.addAll(member.partitionsPendingAssignment().keySet());
+        allTopicIds.addAll(memberAssignedPartitions.keySet());
 
         for (Uuid topicId : allTopicIds) {
             Set<Integer> target = targetAssignment.partitions()
                 .getOrDefault(topicId, Collections.emptySet());
-            Set<Integer> currentAssignedPartitions = 
member.assignedPartitions()
-                .getOrDefault(topicId, Collections.emptySet());
-            Set<Integer> currentRevokingPartitions = 
member.partitionsPendingRevocation()
+            Set<Integer> currentAssignedPartitions = memberAssignedPartitions
                 .getOrDefault(topicId, Collections.emptySet());
 
-            // Assigned_1 = (Assigned_0 + Pending_Revocation_0) ∩ Target
-            // Assigned_0 + Pending_Revocation_0 is used here because the 
partitions
-            // being revoked are still owned until the revocation is 
acknowledged.
+            // Assigned_1 = Assigned_0 ∩ Target

Review Comment:
   i think it would be good to define what Assigned_1 and Assigned_0 
represents. or use Assigned_new Assigned_old



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to