squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2875882416


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,20 +292,23 @@ public ConsumerGroupMember build() {
     private final String serverAssignorName;
 
     /**
-     * The partitions assigned to this member.
+     * The partitions assigned to this member and their assignment epochs.
+     * A map of topic ids to partitions to assignment epochs.
      */
-    private final Map<Uuid, Set<Integer>> assignedPartitions;
+    private final Map<Uuid, Map<Integer, Integer>> assignedPartitions;
 
     /**
-     * The partitions being revoked by this member.
+     * The partitions awaiting revocation from this member and their 
assignment epochs.
+     * A map of topic ids to partitions to assignment epochs.
      */
-    private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+    private final Map<Uuid, Map<Integer, Integer>> partitionsPendingRevocation;
 
     /**
      * The classic member metadata if the consumer uses the classic protocol.
      */
     private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata;
 
+

Review Comment:
   nit: stray newline change
   ```suggestion
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +495,7 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
             .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
     }
 
-    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
+    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromAssignmentWithoutEpochs(

Review Comment:
   Since we've chosen to remove the `withEpochs` variant of this method, I 
would just call it `topicPartitionsFromAssignment`.
   
   ```suggestion
       private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromAssignment(
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -380,37 +381,39 @@ private ConsumerGroupMember computeNextAssignment(
         for (Uuid topicId : allTopicIds) {
             Set<Integer> target = targetAssignment.partitions()
                 .getOrDefault(topicId, Set.of());
-            Set<Integer> currentAssignedPartitions = memberAssignedPartitions
-                .getOrDefault(topicId, Set.of());
+            Map<Integer, Integer> currentAssignedPartitions = 
memberAssignedPartitions
+                .getOrDefault(topicId, Map.of());
 
             // If the member is no longer subscribed to the topic, treat its 
target assignment as empty.
             if (!subscribedTopicIds.contains(topicId)) {
                 target = Set.of();
             }
 
             // New Assigned Partitions = Previous Assigned Partitions ∩ Target
-            Set<Integer> assignedPartitions = new 
HashSet<>(currentAssignedPartitions);
-            assignedPartitions.retainAll(target);
+            Map<Integer, Integer> assignedPartitions = new 
HashMap<>(currentAssignedPartitions);
+            assignedPartitions.keySet().retainAll(target);
 
             // Partitions Pending Revocation = Previous Assigned Partitions - 
New Assigned Partitions
-            Set<Integer> partitionsPendingRevocation = new 
HashSet<>(currentAssignedPartitions);
-            partitionsPendingRevocation.removeAll(assignedPartitions);
+            Map<Integer, Integer> partitionsPendingRevocation = new 
HashMap<>(currentAssignedPartitions);
+            
partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
 
             // Partitions Pending Assignment = Target - New Assigned 
Partitions - Unreleased Partitions
             Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
-            partitionsPendingAssignment.removeAll(assignedPartitions);
+            partitionsPendingAssignment.removeAll(assignedPartitions.keySet());
             hasUnreleasedPartitions = 
partitionsPendingAssignment.removeIf(partitionId ->
                 currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
                 // Don't consider a partition unreleased if it is owned by the 
current member
                 // because it is pending revocation. This is safe to do since 
only a single member
                 // can own a partition at a time.
-                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Map.of()).containsKey(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions, preserve existing 
epochs
             if (!assignedPartitions.isEmpty()) {
                 newAssignedPartitions.put(topicId, assignedPartitions);
             }
 
+            // Build epochs map for partitions pending revocation, preserve 
existing epochs

Review Comment:
   I'm not sure this comment is necessary any more.
   ```suggestion
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -554,16 +561,20 @@ public void 
testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocati
                     .setPartitions(Arrays.asList(5, 6))))
             .build();
 
+        // Retained partitions keep original epoch (10), partition 4 was 
pending revocation so gets new epoch (12),
+        // new partition 7 also gets new epoch (12)
+        Map<Uuid, Map<Integer, Integer>> expectedAssignment = Map.of(
+            topicId1, Map.of(2, 10, 3, 10, 4, 12),
+            topicId2, Map.of(5, 10, 6, 10, 7, 12)
+        );
+
         assertEquals(
             new ConsumerGroupMember.Builder("member")
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(12)
                 .setPreviousMemberEpoch(10)
                 .setSubscribedTopicNames(List.of(topic1, topic2))
-                .setAssignedPartitions(mkAssignment(
-                    mkTopicAssignment(topicId1, 2, 3, 4),
-                    mkTopicAssignment(topicId2, 5, 6, 7)))
-                .setPartitionsPendingRevocation(Map.of())
+                .setAssignedPartitions(expectedAssignment)

Review Comment:
   ```suggestion
                   .setAssignedPartitions(mkAssignmentWithEpochs(
                       mkTopicAssignmentWithEpochs(topicId1, 10, 2, 3),
                       mkTopicAssignmentWithEpochs(topicId1, 12, 4),
                       mkTopicAssignmentWithEpochs(topicId2, 10, 5, 6),
                       mkTopicAssignmentWithEpochs(topicId2, 12, 7)))
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -804,13 +804,19 @@ public static CoordinatorRecord 
newShareGroupStatePartitionMetadataRecord(
     }
 
     private static 
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
toTopicPartitions(
-        Map<Uuid, Set<Integer>> topicPartitions
-    ) {
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics 
= new ArrayList<>(topicPartitions.size());
-        topicPartitions.forEach((topicId, partitions) ->
+        Map<Uuid, Map<Integer, Integer>> topicPartitionsWithEpochs

Review Comment:
   Could we rename the parameter to `assignment`?
   ```suggestion
           Map<Uuid, Map<Integer, Integer>> assignment
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -380,37 +381,39 @@ private ConsumerGroupMember computeNextAssignment(
         for (Uuid topicId : allTopicIds) {
             Set<Integer> target = targetAssignment.partitions()
                 .getOrDefault(topicId, Set.of());
-            Set<Integer> currentAssignedPartitions = memberAssignedPartitions
-                .getOrDefault(topicId, Set.of());
+            Map<Integer, Integer> currentAssignedPartitions = 
memberAssignedPartitions
+                .getOrDefault(topicId, Map.of());
 
             // If the member is no longer subscribed to the topic, treat its 
target assignment as empty.
             if (!subscribedTopicIds.contains(topicId)) {
                 target = Set.of();
             }
 
             // New Assigned Partitions = Previous Assigned Partitions ∩ Target
-            Set<Integer> assignedPartitions = new 
HashSet<>(currentAssignedPartitions);
-            assignedPartitions.retainAll(target);
+            Map<Integer, Integer> assignedPartitions = new 
HashMap<>(currentAssignedPartitions);
+            assignedPartitions.keySet().retainAll(target);
 
             // Partitions Pending Revocation = Previous Assigned Partitions - 
New Assigned Partitions
-            Set<Integer> partitionsPendingRevocation = new 
HashSet<>(currentAssignedPartitions);
-            partitionsPendingRevocation.removeAll(assignedPartitions);
+            Map<Integer, Integer> partitionsPendingRevocation = new 
HashMap<>(currentAssignedPartitions);
+            
partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
 
             // Partitions Pending Assignment = Target - New Assigned 
Partitions - Unreleased Partitions
             Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
-            partitionsPendingAssignment.removeAll(assignedPartitions);
+            partitionsPendingAssignment.removeAll(assignedPartitions.keySet());
             hasUnreleasedPartitions = 
partitionsPendingAssignment.removeIf(partitionId ->
                 currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
                 // Don't consider a partition unreleased if it is owned by the 
current member
                 // because it is pending revocation. This is safe to do since 
only a single member
                 // can own a partition at a time.
-                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Map.of()).containsKey(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions, preserve existing 
epochs

Review Comment:
   I'm not sure this comment is necessary any more.
   ```suggestion
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,25 +291,26 @@ private boolean ownsRevokedPartitions(
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
      * @param memberEpoch               The epoch of the member to use.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitions  The assigned partitions of the member 
to use and their assignment epochs.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember updateCurrentAssignment(
         int memberEpoch,
-        Map<Uuid, Set<Integer>> memberAssignedPartitions
+        Map<Uuid, Map<Integer, Integer>> memberAssignedPartitions
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitions;
+        Map<Uuid, Map<Integer, Integer>> newPartitionsPendingRevocation = 
Map.of();
+
         if (subscribedTopicIds.isEmpty() && 
member.partitionsPendingRevocation().isEmpty()) {
             newAssignedPartitions = Map.of();
+            // Move all assigned to pending revocation
             newPartitionsPendingRevocation = memberAssignedPartitions;
         } else {
             newAssignedPartitions = memberAssignedPartitions;

Review Comment:
   Could we initialize `newPartitionsPendingRevocation` in the else branch, but 
not clone `member.partitionsPendingRevocation`? Initializing to an empty map 
sets up a footgun later on if the else branch was taken, where using 
`newPartitionsPendingRevocation` is a bug unless a topic became unsubscribed.
   
   I missed this last time, sorry.
   
   ```suggestion
               newAssignedPartitions = memberAssignedPartitions;
               newPartitionsPendingRevocation = 
member.partitionsPendingRevocation();
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1043,20 +1043,20 @@ private void maybeRemovePartitionEpoch(
     }
 
     /**
-     * Removes the partition epochs based on the provided assignment.
+     * Removes the partition epochs based on the provided assignment and 
member epoch.
      *
-     * @param assignment    The assignment.
-     * @param expectedEpoch The expected epoch.
+     * @param assignment    The assignment with epochs. The assignment epochs 
are ignored.
+     * @param expectedEpoch The expected member epoch.
      * package-private for testing.
      */
     void removePartitionEpochs(
-        Map<Uuid, Set<Integer>> assignment,
+        Map<Uuid, Map<Integer, Integer>> assignment,
         int expectedEpoch
     ) {
-        assignment.forEach((topicId, assignedPartitions) -> {
+        assignment.forEach((topicId, partitionEpochMap) -> {

Review Comment:
   Thanks for fixing up the names. We're still inconsistent though.
   
   We call it `partitionEpochMap` in `ConsumerGroup.addPartitionEpochs`, 
`removePartitionEpochs` and `GroupCoordinatorRecordHelpers.toTopicPartitions`.
   
   We call it `partitionEpochs` in 
`ConsumerGroupMember.resetAssignedPartitionsEpochsToZero`, `assignmentEpoch`, 
`pendingRevocationEpoch`, `Utils.assignmentFromTopicPartitions`, 
`Utils.toAssignmentWithEpochs`, 
`AssignmentTestUtil.mkTopicAssignmentWithEpochs` and 
`CurrentAssignmentBuilderBenchmark.setupMember`.
   
   It looks like `partitionEpochs` is the more natural name since we end up 
using it more often?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -71,6 +71,35 @@ public static Map<Uuid, Set<Integer>> 
mkOrderedAssignment(Map.Entry<Uuid, Set<In
         return Collections.unmodifiableMap(assignment);
     }
 
+    public static Map.Entry<Uuid, Map<Integer, Integer>> 
mkTopicAssignmentWithEpochs(
+        Uuid topicId,
+        int epoch,
+        Integer... partitions
+    ) {
+        Map<Integer, Integer> partitionEpochs = new HashMap<>();
+        for (Integer partition : partitions) {
+            partitionEpochs.put(partition, epoch);
+        }
+        return new AbstractMap.SimpleEntry<>(topicId, partitionEpochs);
+    }
+
+    @SafeVarargs
+    public static Map<Uuid, Map<Integer, Integer>> mkAssignmentWithEpochs(
+        Map.Entry<Uuid, Map<Integer, Integer>>... entries
+    ) {
+        Map<Uuid, Map<Integer, Integer>> assignment = new HashMap<>();
+        for (Map.Entry<Uuid, Map<Integer, Integer>> entry : entries) {
+            assignment.merge(entry.getKey(), new HashMap<>(entry.getValue()), 
(existing, newValue) -> {
+                Map<Integer, Integer> merged = new HashMap<>(existing);
+                merged.putAll(newValue);
+                return merged;
+            });

Review Comment:
   Does this work?
   ```suggestion
               assignment
                   .computeIfAbsent(entry.getKey(), __ -> new HashMap<>())
                   .putAll(entry.getValue())
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2476,9 +2489,7 @@ member2RejoinId, new MemberAssignmentImpl(mkAssignment(
             .setRebalanceTimeoutMs(5000)
             .setSubscribedTopicNames(List.of("foo", "bar"))
             .setServerAssignorName("range")
-            .setAssignedPartitions(mkAssignment(
-                mkTopicAssignment(fooTopicId, 3, 4, 5),
-                mkTopicAssignment(barTopicId, 0, 1, 2)))
+            .setAssignedPartitions(expectedRejoinedAssignment)

Review Comment:
   ```suggestion
               .setAssignedPartitions(mkAssignmentWithEpochs(
                   mkTopicAssignmentWithEpochs(fooTopicId, 0, 3, 4, 5),
                   mkTopicAssignmentWithEpochs(barTopicId, 11, 0, 1, 2)))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##########
@@ -235,4 +239,43 @@ void testComputeGroupHashWithSameKeyButDifferentValue() {
         );
         assertNotEquals(Utils.computeGroupHash(map1), 
Utils.computeGroupHash(map2));
     }
+
+    @Test
+    void testAssignmentFromTopicPartitionsWithNegativeDefaultEpoch() {
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions = List.of(
+            new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(FOO_TOPIC_ID)
+                .setPartitions(Arrays.asList(0, 1, 2))
+        );
+
+        Map<Uuid, Map<Integer, Integer>> result = 
Utils.assignmentFromTopicPartitions(
+            topicPartitions,
+            LEAVE_GROUP_STATIC_MEMBER_EPOCH // -2
+        );
+
+        // Verify epoch is adjusted to 0
+        assertEquals(Map.of(
+            FOO_TOPIC_ID, Map.of(0, 0, 1, 0, 2, 0)
+        ), result);
+    }
+
+    @Test
+    void testAssignmentFromTopicPartitionsWithEpochsProvided() {
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions = List.of(
+            new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(FOO_TOPIC_ID)
+                .setPartitions(Arrays.asList(0, 1, 2))
+                .setAssignmentEpochs(Arrays.asList(5, 6, 7))
+        );
+
+        Map<Uuid, Map<Integer, Integer>> result = 
Utils.assignmentFromTopicPartitions(
+            topicPartitions,
+            LEAVE_GROUP_STATIC_MEMBER_EPOCH  // -2
+        );
+
+        // Verify assignment epochs are used
+        assertEquals(Map.of(
+            FOO_TOPIC_ID, Map.of(0, 5, 1, 6, 2, 7)
+        ), result);

Review Comment:
   Can we use `mkAssignment` to construct the expected result?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3561,14 +3574,18 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
             result.response()
         );
 
+        // member2: partition 3 (fooTopicId) and 2 (barTopicId) were retained 
from epoch 10,
+        // partition 2 (fooTopicId) is newly assigned at epoch 11
+        Map<Uuid, Map<Integer, Integer>> member2ExpectedAssignment = Map.of(
+            fooTopicId, new HashMap<>(Map.of(2, 11, 3, 10)),
+            barTopicId, Map.of(2, 10)
+        );
         assertRecordsEquals(List.of(
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
new ConsumerGroupMember.Builder(memberId2)
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(11)
                 .setPreviousMemberEpoch(10)
-                .setAssignedPartitions(mkAssignment(
-                    mkTopicAssignment(fooTopicId, 2, 3),
-                    mkTopicAssignment(barTopicId, 2)))
+                .setAssignedPartitions(member2ExpectedAssignment)

Review Comment:
   ```suggestion
                   .setAssignedPartitions(mkAssignmentWithEpochs(
                       mkTopicAssignmentWithEpochs(fooTopicId, 11, 2),
                       mkTopicAssignmentWithEpochs(fooTopicId, 10, 3),
                       mkTopicAssignmentWithEpochs(barTopicId, 2, 10)))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -71,6 +71,35 @@ public static Map<Uuid, Set<Integer>> 
mkOrderedAssignment(Map.Entry<Uuid, Set<In
         return Collections.unmodifiableMap(assignment);
     }
 
+    public static Map.Entry<Uuid, Map<Integer, Integer>> 
mkTopicAssignmentWithEpochs(
+        Uuid topicId,
+        int epoch,
+        Integer... partitions
+    ) {
+        Map<Integer, Integer> partitionEpochs = new HashMap<>();
+        for (Integer partition : partitions) {
+            partitionEpochs.put(partition, epoch);
+        }
+        return new AbstractMap.SimpleEntry<>(topicId, partitionEpochs);
+    }
+
+    @SafeVarargs
+    public static Map<Uuid, Map<Integer, Integer>> mkAssignmentWithEpochs(
+        Map.Entry<Uuid, Map<Integer, Integer>>... entries
+    ) {
+        Map<Uuid, Map<Integer, Integer>> assignment = new HashMap<>();
+        for (Map.Entry<Uuid, Map<Integer, Integer>> entry : entries) {
+            assignment.merge(entry.getKey(), new HashMap<>(entry.getValue()), 
(existing, newValue) -> {
+                Map<Integer, Integer> merged = new HashMap<>(existing);
+                merged.putAll(newValue);
+                return merged;
+            });
+        }
+        Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();
+        assignment.forEach((k, v) -> result.put(k, 
Collections.unmodifiableMap(v)));
+        return Collections.unmodifiableMap(result);

Review Comment:
   The ordering of the `HashMap` and `LinkedHashMap` will be the same.
   ```suggestion
           assignment.replaceAll((__, innerMap) -> 
Collections.unmodifiableMap(innerMap));
           return Collections.unmodifiableMap(assignment);
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##########
@@ -235,4 +239,43 @@ void testComputeGroupHashWithSameKeyButDifferentValue() {
         );
         assertNotEquals(Utils.computeGroupHash(map1), 
Utils.computeGroupHash(map2));
     }
+
+    @Test
+    void testAssignmentFromTopicPartitionsWithNegativeDefaultEpoch() {
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions = List.of(
+            new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(FOO_TOPIC_ID)
+                .setPartitions(Arrays.asList(0, 1, 2))
+        );
+
+        Map<Uuid, Map<Integer, Integer>> result = 
Utils.assignmentFromTopicPartitions(
+            topicPartitions,
+            LEAVE_GROUP_STATIC_MEMBER_EPOCH // -2
+        );
+
+        // Verify epoch is adjusted to 0
+        assertEquals(Map.of(
+            FOO_TOPIC_ID, Map.of(0, 0, 1, 0, 2, 0)
+        ), result);

Review Comment:
   Can we use `mkAssignment` to construct the expected result?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1477,9 +1481,9 @@ public void 
testUpdatingSubscriptionTriggersNewTargetAssignment() {
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
             .setSubscribedTopicNames(List.of("foo", "bar"))
             .setServerAssignorName("range")
-            .setAssignedPartitions(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
-                mkTopicAssignment(barTopicId, 0, 1, 2)))
+            .setAssignedPartitions(Map.of(
+                fooTopicId, 
toAssignmentWithEpochs(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 
4, 5)), 10).get(fooTopicId),
+                barTopicId, 
toAssignmentWithEpochs(mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)), 
11).get(barTopicId)))

Review Comment:
   ```suggestion
               .setAssignedPartitions(mkAssignmentWithEpochs(
                   mkTopicAssignmentWithEpochs(fooTopicId, 10, 0, 1, 2, 3, 4, 
5),
                   mkTopicAssignmentWithEpochs(barTopicId, 11, 0, 1, 2)))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -114,15 +115,19 @@ public void testStableToStableWithNewPartitions() {
             .withCurrentPartitionEpoch((topicId, partitionId) -> -1)
             .build();
 
+        // Retained partitions keep their original epoch (10), new partitions 
get the new epoch (11)
+        Map<Uuid, Map<Integer, Integer>> expectedAssignment = Map.of(
+            topicId1, Map.of(1, 10, 2, 10, 3, 10, 4, 11),
+            topicId2, Map.of(4, 10, 5, 10, 6, 10, 7, 11)
+        );
+
         assertEquals(
             new ConsumerGroupMember.Builder("member")
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(11)
                 .setPreviousMemberEpoch(10)
                 .setSubscribedTopicNames(List.of(topic1, topic2))
-                .setAssignedPartitions(mkAssignment(
-                    mkTopicAssignment(topicId1, 1, 2, 3, 4),
-                    mkTopicAssignment(topicId2, 4, 5, 6, 7)))
+                .setAssignedPartitions(expectedAssignment)

Review Comment:
   ```suggestion
                   .setAssignedPartitions(mkAssignmentWithEpochs(
                       mkTopicAssignmentWithEpochs(topicId1, 10, 1, 2, 3),
                       mkTopicAssignmentWithEpochs(topicId1, 11, 4),
                       mkTopicAssignmentWithEpochs(topicId2, 10, 4, 5, 6),
                       mkTopicAssignmentWithEpochs(topicId2, 11, 7)))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14123,9 +14148,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
             .setState(MemberState.STABLE)
             .setSubscribedTopicNames(List.of(fooTopicName, barTopicName, 
zarTopicName))
-            .setAssignedPartitions(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0),
-                mkTopicAssignment(zarTopicId, 0)))
+            .setAssignedPartitions(expectedAssignment)

Review Comment:
   ```suggestion
               .setAssignedPartitions(mkAssignmentWithEpochs(
                   mkTopicAssignmentWithEpochs(fooTopicId, 10, 0),
                   mkTopicAssignmentWithEpochs(zarTopicId, 11, 0)))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to