dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1522869232


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {

Review Comment:
   > So just so I get it straight -- we have a timeout only for the revocation 
of the partitions. If we hit the timeout, the member is fenced and I assume we 
can count those partitions as revoked. (Or we no longer have to wait for them 
to be revoked)
   
   Yep. This is correct. As soon as the member is fenced, all its partitions 
become available immediately.
   
   > At the point all partitions are revoked, we can try to assign new 
partitions to members. In this case, are we relying on the heartbeat to kick 
out members out if they aren't responding? And we expect if requests are going 
through the assignment will occur?
   
   Correct. We rely on the heartbeat to maintain the session of the member. If 
the member does not heartbeat at least once within the session timeout, we 
fence it.
   
   > Just trying to confirm the reason for the separate timeout here. Is it 
because revoking is more likely to fail even though the heartbeat still goes 
through, but not the same for assigning?
   
   The reason is that there actually two processes piggy backing on the 
heartbeat: 1) the session; and 2) the assignment reconciliation. The session is 
maintained by the regular heartbeat. The assignment reconciliation relies on 
the reported partitions provided by the member. It is totally possible for a 
member to heartbeat to maintain its sessions while being stuck on the 
assignment part. This is the main reason why we need the second timeout. We 
want to ensure that a member cannot block all the others forever if it does not 
releases its revoked partitions.
   
   Regarding the assigning case, we don't apply the same timeout because we 
have discovered that the consumer could actually be stuck when a partition that 
does not exist anymore is assigned to it. In this case, the consumer won't be 
able to get the metadata for the newly assigned partition and it will retry 
forever. On the server, if we wait on the member to confirms that assignment, 
we cannot revoke the deleted partitions. This is the main reason why we treat 
both the revocation and the assignment differently. The revocation blocks the 
reconciliation process while the assignment does not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to