Copilot commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2827514018


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -289,8 +289,8 @@ public static CoordinatorRecord 
newConsumerGroupCurrentAssignmentRecord(
                     .setMemberEpoch(member.memberEpoch())
                     .setPreviousMemberEpoch(member.previousMemberEpoch())
                     .setState(member.state().value())
-                    
.setAssignedPartitions(toTopicPartitions(member.assignedPartitions()))
-                    
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation())),
+                    
.setAssignedPartitions(toTopicPartitions(member.assignedPartitionsWithEpochs()))
+                    
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocationWithEpochs())),

Review Comment:
   newConsumerGroupCurrentAssignmentRecord now exclusively serializes 
member.assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(). 
If those maps are empty while the Set-based assignment fields are populated 
(possible via existing Builder setters), this will write records with empty 
assignments and effectively lose the current assignment on disk. Consider 
adding a defensive fallback (e.g., derive epoch maps from 
assignedPartitions()/partitionsPendingRevocation() with a sensible default 
epoch) or validate/throw if the member is internally inconsistent before 
serializing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For member using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol
+        // Case 1: Strict epoch match
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+        // Case 2:Client epoch > broker epoch, which is an invalid request

Review Comment:
   Minor comment formatting: add a space after the colon in "Case 2:Client".
   ```suggestion
           // Case 2: Client epoch > broker epoch, which is an invalid request
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -28,6 +28,9 @@
 import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
 
 import java.util.ArrayList;
+import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;

Review Comment:
   Import order looks inconsistent with the rest of the module: 
java.util.stream.Collectors is currently placed between java.util.ArrayList and 
java.util.Collections. Other files keep java.util.* imports together and place 
java.util.stream.* afterwards (e.g., Utils.java:36-49). Please reorder these 
imports to match the established style (this may also be enforced by 
Checkstyle).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For member using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol
+        // Case 1: Strict epoch match
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+        // Case 2:Client epoch > broker epoch, which is an invalid request
+        if (memberEpoch > member.memberEpoch()) {
+            throw new StaleMemberEpochException(String.format("The received 
member epoch %d is larger than "
+                + "the expected member epoch %d.", memberEpoch, 
member.memberEpoch()));
+        }
+        return createAssignmentEpochValidator(member, memberEpoch);

Review Comment:
   The new relaxed offset-commit validation path (returning a per-partition 
validator when receivedMemberEpoch < broker member epoch) doesn’t appear to be 
covered by unit tests. It would be useful to add ConsumerGroupTest cases that 
(1) accept commits when assignmentEpoch <= receivedEpoch < brokerEpoch for 
assigned/pending-revocation partitions, and (2) reject commits when the 
partition isn’t assigned or when receivedEpoch < assignmentEpoch, to prevent 
regressions in the new behavior.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -205,6 +212,66 @@ public Builder 
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.Classic
             return this;
         }
 
+        public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer, 
Integer>> assignedPartitionsWithEpochs) {
+            this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
+            this.assignedPartitions = 
assignedPartitionsWithEpochs.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new HashSet<>(e.getValue().keySet())
+                ));
+            return this;
+        }
+
+        public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid, 
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+            this.partitionsPendingRevocationWithEpochs = 
partitionsPendingRevocationWithEpochs;
+            this.partitionsPendingRevocation = 
partitionsPendingRevocationWithEpochs.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new HashSet<>(e.getValue().keySet())
+                ));
+            return this;
+        }

Review Comment:
   setAssignedPartitionsWithEpochs / setPartitionsPendingRevocationWithEpochs 
keep the Set-based fields in sync, but the Builder still also has the older 
setAssignedPartitions(...) and setPartitionsPendingRevocation(...) setters. 
Given GroupCoordinatorRecordHelpers now serializes 
assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(), any 
code path that uses the older setters (without also setting the epoch maps) can 
drop epoch information and even persist empty assignments. Consider enforcing 
invariants in build() (e.g., derive epoch maps when missing), or 
deprecating/removing the Set-based setters to prevent constructing an 
internally inconsistent ConsumerGroupMember.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For member using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {

Review Comment:
   Grammar in this comment is off: consider changing "For member using the 
classic protocol" to "For members using the classic protocol" (or "For a member 
using...") for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to