squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2584282720


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -375,11 +374,11 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
         String fooSubtopologyId = "foo-sub";
         StreamsGroup streamsGroup = createStreamsGroup("foo");
 
-        // Removing should fail because there is no epoch set.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
+        // Removing should be a no-op when there is no epoch set.

Review Comment:
   ```suggestion
           // Removing should be a no-op when there is no process id set.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -388,11 +387,17 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
         streamsGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+        // Removing with incorrect epoch should do nothing. 

Review Comment:
   ```suggestion
           // Removing with incorrect process id should do nothing. 
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -388,11 +387,17 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
         streamsGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+        // Removing with incorrect epoch should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        streamsGroup.removeTaskProcessIds(
+            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9, 
mkTasks(fooSubtopologyId, 1)),
             "process1"
-        ));
+        );

Review Comment:
   Can we assert that the process id is unchanged?
   ```java
   if (taskRole == TaskRole.ACTIVE) {
       assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
   }
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberIdA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }
+        // When record 4 is dropped by compaction, we want member B's 
assignment to be accepted with the same epoch. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+            .build()));
+
+        // Member A yields bar at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .build()));
+
+        // Member A yields foo. [record removed by compaction]
+        // Member B is assigned foo at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // Verify partition foo-0 is assigned to member B at epoch 11.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)), 
group.members().get(memberIdB).assignedPartitions());
+        assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyId = "subtopology";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger.
+
+        // Assign task 0 to member A.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 12))))
+            .build()));
+
+        // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 1.

Review Comment:
   ```suggestion
           // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 0.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {

Review Comment:
   ```suggestion
       public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)

Review Comment:
   ```suggestion
           // foo-0's owner is replaced by member B at epoch 12.
           
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)

Review Comment:
   ```suggestion
           // foo becomes unowned.
           
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 

Review Comment:
   We actually use `foo-0` below.
   ```suggestion
           // 1. Member A is assigned partition foo-0.
           // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
           // 3. Member B is assigned partition foo-0.
           // 4. Member B is unassigned partition foo-0. 
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberIdA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));

Review Comment:
   For completeness, let's check foo-0 as well.
   ```suggestion
           assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
           assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberIdA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }
+        // When record 4 is dropped by compaction, we want member B's 
assignment to be accepted with the same epoch. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+            .build()));
+
+        // Member A yields bar at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .build()));
+
+        // Member A yields foo. [record removed by compaction]
+        // Member B is assigned foo at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // Verify partition foo-0 is assigned to member B at epoch 11.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)), 
group.members().get(memberIdB).assignedPartitions());
+        assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyId = "subtopology";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger.
+
+        // Assign task 0 to member A.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 12))))
+            .build()));
+
+        // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 1.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(1, 13))))
+            .build()));
+
+        // Check task 1 is assigned to member A and task 0 to member B.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyId, 1));
+        assertEquals(processIdB, 
group.currentActiveTaskProcessId(subtopologyId, 0));
+    }
+
+    @Test
+    public void 
testReplayStreamsGroupUnassignmentRecordSkippedWithCompaction() {

Review Comment:
   ```suggestion
       public void 
testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithCompaction() {
   ```
   
   And could you make the structure of this test mirror the consumer groups 
one? They're out of sync.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -286,23 +287,37 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
                 mkTopicAssignment(fooTopicId, 1)))
             .build();
 
-        // m2 should not be able to acquire foo-1 because the partition is
-        // still owned by another member.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+        // m2 can acquire foo-1 because the epoch is at least as large as m1's 
epoch.
+        consumerGroup.updateMember(m2);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)), 
+            consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+        );

Review Comment:
   I'm strongly in favor of removing the asserts on 
`member.assignedPartitions()` in the epoch/process id-based tests. The 
partition epoch methods aren't expected to ever touch the member assignment, 
since the Member objects are designed to be immutable.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberIdA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }
+        // When record 4 is dropped by compaction, we want member B's 
assignment to be accepted with the same epoch. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+            .build()));
+
+        // Member A yields bar at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .build()));
+
+        // Member A yields foo. [record removed by compaction]
+        // Member B is assigned foo at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // Verify partition foo-0 is assigned to member B at epoch 11.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)), 
group.members().get(memberIdB).assignedPartitions());
+        assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyId = "subtopology";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger.
+
+        // Assign task 0 to member A.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 12))))
+            .build()));
+
+        // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 1.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(1, 13))))
+            .build()));
+
+        // Check task 1 is assigned to member A and task 0 to member B.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyId, 1));
+        assertEquals(processIdB, 
group.currentActiveTaskProcessId(subtopologyId, 0));
+    }
+
+    @Test
+    public void 
testReplayStreamsGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyFoo = "subtopologyFoo";
+        String subtopologyBar = "subtopologyBar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task foo-1.
+        // 2. Member A is unassigned task foo-1 [record removed by compaction].
+        // 3. Member B is assigned task foo-1.
+        // 4. Member B is unassigned task foo-1. 
+        // 5. Member A is assigned task bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+        // member A to bar-0 to succeed because the epochs are larger. 
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 11, 1, 11))))

Review Comment:
   ```suggestion
                   TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 11))))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  

Review Comment:
   We're not really interested in step 3 because that's covered by the test 
case above.
   ```suggestion
           // This can lead to conflicts from updating an owned partition in 
step 3 and attempting to remove
           // nonexistent ownership in step 5. We want to ensure that removing 
ownership from a
           // completely unowned topic in step 5 is allowed.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify member A only has ownership of partition bar-0. Member B has 
no partitions.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(barTopicId, 0)), 
group.members().get(memberIdA).assignedPartitions());
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A unsubscribes from topic bar at epoch 10: Member A { 
epoch: 10, assigned partitions: [foo], pending revocations: [bar] }
+        // 2. A new assignment is available at epoch 11 with member A 
unsubscribing from topic foo.
+        // 3. Member A yields bar. The epoch is bumped to 11: Member A { 
epoch: 11, assigned partitions: [], pending revocations: [foo] }
+        // 4. Member A yields topic foo. Member A { epoch: 11, assigned 
partitions: [], pending revocations: [] } [removed by compaction]
+        // 5. Member B is assigned topic foo. Member B { epoch: 11, assigned 
partitions: [foo], pending revocations: [] }
+        // When record 4 is dropped by compaction, we want member B's 
assignment to be accepted with the same epoch. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(barTopicId, 0)))
+            .build()));
+
+        // Member A yields bar at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+            .build()));
+
+        // Member A yields foo. [record removed by compaction]
+        // Member B is assigned foo at epoch 11.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // Verify partition foo-0 is assigned to member B at epoch 11.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 0)), 
group.members().get(memberIdB).assignedPartitions());
+        assertEquals(11, group.currentPartitionEpoch(fooTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyId = "subtopology";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger.
+
+        // Assign task 0 to member A.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 12))))
+            .build()));
+
+        // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 1.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(1, 13))))
+            .build()));
+
+        // Check task 1 is assigned to member A and task 0 to member B.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyId, 1));
+        assertEquals(processIdB, 
group.currentActiveTaskProcessId(subtopologyId, 0));
+    }
+
+    @Test
+    public void 
testReplayStreamsGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+
+        String subtopologyFoo = "subtopologyFoo";
+        String subtopologyBar = "subtopologyBar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setProcessId(processIdA)
+            .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setProcessId(processIdB)
+            .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task foo-1.
+        // 2. Member A is unassigned task foo-1 [record removed by compaction].
+        // 3. Member B is assigned task foo-1.
+        // 4. Member B is unassigned task foo-1. 
+        // 5. Member A is assigned task bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+        // member A to bar-0 to succeed because the epochs are larger. 
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 11, 1, 11))))
+            .build()));
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 12))))
+            .build()));
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyBar, 
Map.of(0, 14))))
+            .build()));
+
+        // Check task bar-0 is assigned to member A. Member B has no tasks.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyBar, 0));

Review Comment:
   Let's check both tasks.
   ```suggestion
           assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo, 
0));
           assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyBar, 0));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(topicId, topicName, 2)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 1)
+                .addTopic(barTopicId, barTopicName, 1)
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-1.
+        // 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-1.
+        // 4. Member B is unassigned partition foo-1. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure both assignments 
are allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)

Review Comment:
   ```suggestion
           // Member A is unassigned foo-0.
           
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1027,12 +1027,28 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
                 if (partitionsOrNull == null) {
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
                 }
-                for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
-                    String prevValue = partitionsOrNull.put(partitionId, 
processId);
-                    if (prevValue != null) {
-                        throw new IllegalStateException(
-                            String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-                                "still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+                for (Map.Entry<Integer, Integer> partitionEntry : 
assignedTaskPartitionsWithEpochs.entrySet()) {
+                    Integer partitionId = partitionEntry.getKey();
+                    String prevValue = partitionsOrNull.get(partitionId);
+                    
+                    if (prevValue == null) {
+                        partitionsOrNull.put(partitionId, processId);
+                    } else {
+                        String memberId = null;
+                        for (Map.Entry<String, StreamsGroupMember> memberEntry 
: members.entrySet()) {
+                            if 
(memberEntry.getValue().processId().equals(prevValue)) {
+                                memberId = memberEntry.getKey();
+                                break;
+                            }
+                        }
+                        if (memberId != null && 
+                            
members.get(memberId).assignedTasks().activeTasksWithEpochs().get(subtopologyId).get(partitionId)
 <= partitionEntry.getValue()) {
+                            partitionsOrNull.put(partitionId, processId);
+                        } else {
+                            throw new IllegalStateException(
+                                String.format("[GroupId {}] Cannot set the 
process ID of {}-{} to {} because the partition is " +
+                                "still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue));
+                        }

Review Comment:
   This seems like a huge hassle to iterate through all the members of the 
group. When groups have lots of members it could become a bottleneck. I would 
propose dropping the check for epochs going backwards for streams.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to