jeffkbkim commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1496700583


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;

Review Comment:
   do we need this condition because we can only compute a valid assignment if 
we're given the partitions a member owns?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1807,9 +1776,9 @@ public void testReconciliationProcess() {
 
         assertRecordsEquals(Collections.singletonList(
             RecordHelpers.newCurrentAssignmentRecord(groupId, new 
ConsumerGroupMember.Builder(memberId1)
+                .setState(MemberState.UNREVOKED_PARTITIONS)
                 .setMemberEpoch(10)
-                .setPreviousMemberEpoch(9)
-                .setTargetMemberEpoch(11)
+                .setPreviousMemberEpoch(10)

Review Comment:
   this was bumped from 9 to 10. this seems right because Member 1 was at epoch 
10 previously at L1656. how did the test past before?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {

Review Comment:
   then why do we "cancel consumer group rebalance timeout"? so i think you're 
saying that the rebalance timeout is actually the revocation timeout. is this 
correct?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -779,7 +779,7 @@ private void maybeUpdateGroupState() {
             newState = ASSIGNING;
         } else {
             for (ConsumerGroupMember member : members.values()) {
-                if (member.targetMemberEpoch() != targetAssignmentEpoch.get() 
|| member.state() != ConsumerGroupMember.MemberState.STABLE) {
+                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {

Review Comment:
   ah it's just the negation. thanks



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##########
@@ -24,20 +24,12 @@
       "about": "The current member epoch that is expected from the member in 
the heartbeat request." },
     { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
       "about": "If the last epoch bump is lost before reaching the member, the 
member will retry with the previous epoch." },
-    { "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
-      "about": "The target epoch corresponding to the assignment used to 
compute the AssignedPartitions, the PartitionsPendingRevocation and the 
PartitionsPendingAssignment fields." },
+    { "name": "State", "versions": "0+", "type": "int8",
+      "about": "The member state. See ConsumerGroupMember.MemberState for the 
possible values." },
     { "name": "AssignedPartitions", "versions": "0+", "type": 
"[]TopicPartitions",
       "about": "The partitions assigned to (or owned by) this member." },
     { "name": "PartitionsPendingRevocation", "versions": "0+", "type": 
"[]TopicPartitions",
-      "about": "The partitions that must be revoked by this member." },
-    { "name": "PartitionsPendingAssignment", "versions": "0+", "type": 
"[]TopicPartitions",
-      "about": "The partitions that will be assigned to this member when they 
are freed up by their current owners." },
-    { "name": "Error", "versions": "0+", "type": "int8",
-      "about": "The error reported by the assignor." },
-    { "name": "MetadataVersion", "versions": "0+", "type": "int16",
-      "about": "The version of the metadata bytes." },
-    { "name": "MetadataBytes", "versions": "0+", "type": "bytes",
-      "about": "The metadata bytes." }

Review Comment:
   what's the reason for removing these?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owns any of the revoked partitions. If it does, we cannot 
progress.
+                for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+                    for (Integer partitionId : topicPartitions.partitions()) {
+                        boolean stillHasRevokedPartition = member
+                            .setPartitionsPendingRevocation()
+                            .getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+                            .contains(partitionId);
+                        if (stillHasRevokedPartition) {
+                            return member;
+                        }
+                    }
+                }
+
+                // When the member has revoked all the pending partitions, it 
can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,
+                    member.assignedPartitions()
+                );
+
+            case UNRELEASED_PARTITIONS:
+                // When the member is in the UNRELEASED_PARTITIONS, we 
reconcile the
+                // member towards the latest target assignment. This will 
assign any
+                // of the unreleased partitions when they become available.
+                return computeNextAssignment(
+                    member.memberEpoch(),
+                    member.assignedPartitions()
+                );
+
+            case UNKNOWN:
+                // We could only end up in this state if a new state is added 
in the
+                // future and the group coordinator is downgraded. In this 
case, the
+                // best option is to fence the member to force it to rejoin 
the group
+                // without any partitions and to reconcile it again from 
scratch.
+                if (ownedTopicPartitions == null || 
!ownedTopicPartitions.isEmpty()) {
+                    throw new FencedMemberEpochException("The consumer group 
member is in a unknown state. "
+                        + "The member must abandon all its partitions and 
rejoin.");
+                }
+
+                return computeNextAssignment(
+                    targetAssignmentEpoch,
+                    member.assignedPartitions()
+                );
         }
 
         return member;
     }
 
     /**
-     * Transitions to NewTargetAssignment state. This is a transient state 
where
-     * we compute the assigned partitions, the partitions pending revocation,
-     * the partitions pending assignment, and transition to the next state.
+     * Computes the next assignment.
      *
      * @return A new ConsumerGroupMember.
      */
-    private ConsumerGroupMember transitionToNewTargetAssignmentState() {
+    private ConsumerGroupMember computeNextAssignment(

Review Comment:
   nit: javadocs are missing for the parameters



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1168,36 +1170,15 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // 3. Reconcile the member's assignment with the target assignment. 
This is only required if
         // the member is not stable or if a new target assignment has been 
installed.

Review Comment:
   should we update the comment to say "if the member is not stable or if it is 
stable and a new target has been installed"?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -524,31 +473,10 @@ public Map<Uuid, Set<Integer>> assignedPartitions() {
     /**
      * @return The set of partitions awaiting revocation from the member.
      */
-    public Map<Uuid, Set<Integer>> partitionsPendingRevocation() {
+    public Map<Uuid, Set<Integer>> setPartitionsPendingRevocation() {

Review Comment:
   i think we can keep this as "partitionsPendingRevocation"



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2079,253 +2056,20 @@ public void testReconciliationProcess() {
 
         assertRecordsEquals(Collections.singletonList(
             RecordHelpers.newCurrentAssignmentRecord(groupId, new 
ConsumerGroupMember.Builder(memberId3)
+                .setState(MemberState.STABLE)
                 .setMemberEpoch(11)
                 .setPreviousMemberEpoch(11)
-                .setTargetMemberEpoch(11)
                 .setAssignedPartitions(mkAssignment(
                     mkTopicAssignment(fooTopicId, 4, 5),
                     mkTopicAssignment(barTopicId, 1)))
                 .build())),
             result.records()
         );
 
-        assertEquals(ConsumerGroupMember.MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId3));
         assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
context.consumerGroupState(groupId));
     }
 
-    @Test
-    public void testReconciliationRestartsWhenNewTargetAssignmentIsInstalled() 
{

Review Comment:
   i guess the unit test changes above (i.e. `testReconciliationProcess()`) 
test the opposite behavior?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {
+                cancelConsumerGroupRebalanceTimeout(groupId, 
updatedMember.memberId());
+            }
+        }
+
+        return updatedMember;
+    }
+
+    private String formatAssignment(

Review Comment:
   i guess it's too difficult to retrieve the actual topic names 😅 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
      * @return A new ConsumerGroupMember or the current one.
      */
     public ConsumerGroupMember build() {
-        // A new target assignment has been installed, we need to restart
-        // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-            return transitionToNewTargetAssignmentState();
-        }
-
         switch (member.state()) {
-            // Check if the partitions have been revoked by the member.
-            case REVOKING:
-                return maybeTransitionFromRevokingToAssigningOrStable();
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
+                } else {
+                    return member;
+                }
 
-            // Check if pending partitions have been freed up.
-            case ASSIGNING:
-                return maybeTransitionFromAssigningToAssigningOrStable();
+            case UNREVOKED_PARTITIONS:
+                // When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+                // until the member has revoked the necessary partitions. They 
are
+                // considered revoked when they are not anymore reported in the
+                // owned partitions set in the ConsumerGroupHeartbeat API.
 
-            // Nothing to do.
-            case STABLE:
-                return member;
+                // If the member does not provide its owned partitions. We 
cannot
+                // progress.
+                if (ownedTopicPartitions == null) {
+                    return member;
+                }
+
+                // If the member provides its owned partitions. We verify if 
it still
+                // owens any of the revoked partitions. If it does, we cannot 
progress.
+                for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+                    for (Integer partitionId : topicPartitions.partitions()) {
+                        boolean stillHasRevokedPartition = member
+                            .revokedPartitions()
+                            .getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+                            .contains(partitionId);
+                        if (stillHasRevokedPartition) {
+                            return member;
+                        }
+                    }
+                }
+
+                // When the member has revoked all the pending partitions, it 
can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,

Review Comment:
   ah i think i was not differentiating between the current and next 
assignment. here's my understanding:
   
   CurrentAssignmentBuilder#build will prevent members from advancing forward 
unless they have revoked all partitions.
   
   Once this condition is met, we can move forward and compute the next 
assignment. However, the member may have not revoked all partitions for the 
*next* assignment. We cannot move the member to the target epoch but only to 
the next epoch in this case, hence the `member.memberEpoch() + 1`.
   
   Is my understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to