Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-09 Thread via GitHub


dajac commented on PR #20907:
URL: https://github.com/apache/kafka/pull/20907#issuecomment-3635836846

   Merged to trunk and 4.2. I could not cherry-pick it to 4.1 and 4.0. 
@izzyharker Could you please raise a PR for those branches? I think that it is 
worth fixing this for future minor releases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-09 Thread via GitHub


dajac merged PR #20907:
URL: https://github.com/apache/kafka/pull/20907


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2601194658


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRe

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2599840182


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRecordH

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2599630319


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRe

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2599620157


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRecordH

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2598541341


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   We can revert this change and remove the test as this is addressed by 
https://github.com/apache/kafka/pull/21107.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2598539084


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelper

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2598538427


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelper

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-08 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2594696824


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRe

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-06 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2594696824


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRe

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-06 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2594692440


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1030,9 +1030,8 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
 String prevValue = partitionsOrNull.put(partitionId, 
processId);
 if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+log.debug("[GroupId {}]Setting the process ID of {}-{} 
to {} even though the partition is " +

Review Comment:
   ```suggestion
   log.debug("[GroupId {}] Setting the process ID of 
{}-{} to {} even though the partition is " +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-05 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2593425805


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build()));
+
+// foo becomes unowned.
+
context.replay(GroupCoordinatorRecordH

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-05 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2592662727


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Step 4. does not seem correct to me in this example as I explained in the 
comment about the test itself.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,287 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid topicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-0.
+// 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-0.
+// 4. Member B is unassigned partition foo-0. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+// to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+// completely unowned partition in step 5 is allowed.  
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Buil

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-05 Thread via GitHub


lucasbru commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2592077338


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -980,9 +981,8 @@ private void removeTaskProcessIdsFromSet(
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
 if 
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {

Review Comment:
   ```suggestion
   if (!partitionsOrNull.containsKey(parititionId) || 
!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2587851616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,11 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
+for (Integer partitionId: 
assignedTaskPartitionsWithEpochs.keySet()) {
 String prevValue = partitionsOrNull.put(partitionId, 
processId);
 if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+log.debug("[GroupId {}] Cannot set the process ID of 
{}-{} to {} because the partition is " +
+"still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue);

Review Comment:
   Technically we are actually setting the process ID. Let's update the log 
message to be more accurate.
   ```suggestion
   log.debug("[GroupId {}] Setting the process ID of 
{}-{} to {} even though the partition is " +
   "still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue);
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,11 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
+for (Integer partitionId: 
assignedTaskPartitionsWithEpochs.keySet()) {

Review Comment:
   nit: missing space
   ```suggestion
   for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1004,7 +1004,7 @@ private void removeTaskProcessIdsFromSet(
  *
  * @param tasks The assigned tasks.
  * @param processId The process ID.
- * @throws IllegalStateException if the partition already has an epoch 
assigned. package-private for testing.
+ * @throws IllegalStateException if the existing partition has larger 
epoch than the new one. package-private for testing.

Review Comment:
   I think we don't compare epochs for streams now?
   ```suggestion
* package-private for testing.
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -312,13 +325,19 @@ public void testRemovePartitionEpochs() {
 
 consumerGroup.updateMember(m1);
 
-// Removing should fail because the expected epoch is incorrect.
-assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+// Removing with incorrect epoch should do nothing. 
+// A debug message is logged, no exception is thrown.
+consumerGroup.removePartitionEpochs(
 mkAssignment(
 mkTopicAssignment(fooTopicId, 1)
 ),
 11
-));
+);
+assertEquals(
+mkAssignment(mkTopicAssignment(fooTopicId, 1)),
+consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+);

Review Comment:
   ```suggestion
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -388,11 +389,16 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
 streamsGroup.updateMember(m1);
 
-// Removing should fail because the expected epoch is incorrect.
-assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+// Removing with incorrect process id should do nothing. 
+// A debug message is logged, no exception is thrown.
+streamsGroup.removeTaskProcessIds(
+TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9, 
mkTasks(fooSubtopologyId, 1)),
 "process1"
-));
+);
+assertEquals(m1.assignedTasks(), 
streamsGroup.getMemberOrThrow("m1").

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585599596


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,28 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
-String prevValue = partitionsOrNull.put(partitionId, 
processId);
-if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+for (Map.Entry partitionEntry : 
assignedTaskPartitionsWithEpochs.entrySet()) {
+Integer partitionId = partitionEntry.getKey();
+String prevValue = partitionsOrNull.get(partitionId);
+
+if (prevValue == null) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+String memberId = null;
+for (Map.Entry memberEntry 
: members.entrySet()) {
+if 
(memberEntry.getValue().processId().equals(prevValue)) {
+memberId = memberEntry.getKey();
+break;
+}
+}
+if (memberId != null && 
+
members.get(memberId).assignedTasks().activeTasksWithEpochs().get(subtopologyId).get(partitionId)
 <= partitionEntry.getValue()) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+throw new IllegalStateException(
+String.format("[GroupId {}] Cannot set the 
process ID of {}-{} to {} because the partition is " +
+"still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue));
+}

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585599596


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,28 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
-String prevValue = partitionsOrNull.put(partitionId, 
processId);
-if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+for (Map.Entry partitionEntry : 
assignedTaskPartitionsWithEpochs.entrySet()) {
+Integer partitionId = partitionEntry.getKey();
+String prevValue = partitionsOrNull.get(partitionId);
+
+if (prevValue == null) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+String memberId = null;
+for (Map.Entry memberEntry 
: members.entrySet()) {
+if 
(memberEntry.getValue().processId().equals(prevValue)) {
+memberId = memberEntry.getKey();
+break;
+}
+}
+if (memberId != null && 
+
members.get(memberId).assignedTasks().activeTasksWithEpochs().get(subtopologyId).get(partitionId)
 <= partitionEntry.getValue()) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+throw new IllegalStateException(
+String.format("[GroupId {}] Cannot set the 
process ID of {}-{} to {} because the partition is " +
+"still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue));
+}

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585592430


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-1.
+// 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-1.
+// 4. Member B is unassigned partition foo-1. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+// member A to bar-0 to succeed because the epochs are larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+.build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Buil

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


lucasbru commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585220217


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-1.
+// 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-1.
+// 4. Member B is unassigned partition foo-1. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+// member A to bar-0 to succeed because the epochs are larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+.build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builde

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


lucasbru commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2585097590


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,28 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
-String prevValue = partitionsOrNull.put(partitionId, 
processId);
-if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+for (Map.Entry partitionEntry : 
assignedTaskPartitionsWithEpochs.entrySet()) {
+Integer partitionId = partitionEntry.getKey();
+String prevValue = partitionsOrNull.get(partitionId);
+
+if (prevValue == null) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+String memberId = null;
+for (Map.Entry memberEntry 
: members.entrySet()) {
+if 
(memberEntry.getValue().processId().equals(prevValue)) {
+memberId = memberEntry.getKey();
+break;
+}
+}
+if (memberId != null && 
+
members.get(memberId).assignedTasks().activeTasksWithEpochs().get(subtopologyId).get(partitionId)
 <= partitionEntry.getValue()) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+throw new IllegalStateException(
+String.format("[GroupId {}] Cannot set the 
process ID of {}-{} to {} because the partition is " +
+"still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue));
+}

Review Comment:
   Yes, we are using processIds here to guide the assignment (no stateful tasks 
can be assigned to the same processID), but we don't identify a single member. 
That could be added, of course, but we saw that this wasn't really used in 
consumer groups. So I would support not doing this check here.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-03 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2584282720


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -375,11 +374,11 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 String fooSubtopologyId = "foo-sub";
 StreamsGroup streamsGroup = createStreamsGroup("foo");
 
-// Removing should fail because there is no epoch set.
-assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
+// Removing should be a no-op when there is no epoch set.

Review Comment:
   ```suggestion
   // Removing should be a no-op when there is no process id set.
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -388,11 +387,17 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
 streamsGroup.updateMember(m1);
 
-// Removing should fail because the expected epoch is incorrect.
-assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+// Removing with incorrect epoch should do nothing. 

Review Comment:
   ```suggestion
   // Removing with incorrect process id should do nothing. 
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -388,11 +387,17 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
 
 streamsGroup.updateMember(m1);
 
-// Removing should fail because the expected epoch is incorrect.
-assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+// Removing with incorrect epoch should do nothing. 
+// A debug message is logged, no exception is thrown.
+streamsGroup.removeTaskProcessIds(
+TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9, 
mkTasks(fooSubtopologyId, 1)),
 "process1"
-));
+);

Review Comment:
   Can we assert that the process id is unchanged?
   ```java
   if (taskRole == TaskRole.ACTIVE) {
   assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
   }
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,300 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberIdA = "memberIdA";
+String memberIdB = "memberIdB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newC

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2582862997


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -1027,12 +1027,28 @@ private void addTaskProcessIdFromActiveTasksWithEpochs(
 if (partitionsOrNull == null) {
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitionsWithEpochs.size());
 }
-for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
-String prevValue = partitionsOrNull.put(partitionId, 
processId);
-if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-"still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+for (Map.Entry partitionEntry : 
assignedTaskPartitionsWithEpochs.entrySet()) {
+Integer partitionId = partitionEntry.getKey();
+String prevValue = partitionsOrNull.get(partitionId);
+
+if (prevValue == null) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+String memberId = null;
+for (Map.Entry memberEntry 
: members.entrySet()) {
+if 
(memberEntry.getValue().processId().equals(prevValue)) {
+memberId = memberEntry.getKey();
+break;
+}
+}
+if (memberId != null && 
+
members.get(memberId).assignedTasks().activeTasksWithEpochs().get(subtopologyId).get(partitionId)
 <= partitionEntry.getValue()) {
+partitionsOrNull.put(partitionId, processId);
+} else {
+throw new IllegalStateException(
+String.format("[GroupId {}] Cannot set the 
process ID of {}-{} to {} because the partition is " +
+"still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue));
+}

Review Comment:
   Applied the same version of the fix as in ConsumerGroup#addPartitionEpochs. 
The way epochs are implemented for the Streams groups is not very conducive to 
this use case which is why it's a bit convoluted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2582174253


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-1.
+// 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-1.
+// 4. Member B is unassigned partition foo-1. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+// member A to bar-0 to succeed because the epochs are larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+.build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Buil

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2582174253


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-1.
+// 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-1.
+// 4. Member B is unassigned partition foo-1. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+// member A to bar-0 to succeed because the epochs are larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+.build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Buil

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2582174253


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition foo-1.
+// 2. Member A is unassigned partition foo-1 [record removed by 
compaction].
+// 3. Member B is assigned partition foo-1.
+// 4. Member B is unassigned partition foo-1. 
+// 5. Member A is assigned partition bar-0. 
+// This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+// We would like to not fail in these cases and allow both the 
assignment of member B to foo-1 and 
+// member A to bar-0 to succeed because the epochs are larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+.build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Buil

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2581893040


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -286,23 +287,37 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
 mkTopicAssignment(fooTopicId, 1)))
 .build();
 
-// m2 should not be able to acquire foo-1 because the partition is
-// still owned by another member.
-assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+// m2 can acquire foo-1 because the epoch is at least as large as m1's 
epoch.
+consumerGroup.updateMember(m2);
+assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)), 
+consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+);

Review Comment:
   The `getOrMaybeCreateMember` is used for assertions in a number of the other 
tests as well, it was meant to be a check that the `removePartitionEpochs` call 
didn't change the member assignment. It's maybe not necessary to have both that 
and the assert on the epoch though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-02 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2579928090


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())

Review Comment:
   The indentation's not quite right here.
   ```suggestion
   .addTopic(topicId, topicName, 2)
   .buildCoordinatorMetadataImage())
   ```
   
   The same for the other tests.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,296 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition 0.
+// 2. Member A is unassigned partition 0 [record removed by 
compaction].
+// 3. Member B is assigned partition 0. 
+// 4. Member A is assigned partition 1. 
+// If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+// unassignment records are removed. We would like to not fail in 
these cases.
+// Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Partition 0 must remain with member B at epoch 12 even though 
member A just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+}
+
+@Test
+public void 
testReplayConsumerGroupUnassignmentRecordSkippedWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 3)
+.addTopic(barTopicId, barTopicName, 1)
+.buildCoordinatorMetadataImage())
+.build();
+
+// This test enacts the following scenario:
+// 1. Member A is assigned partition f

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-12-01 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2578119016


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,263 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testConsumerGroupAssignmentResolvesWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.addRacks()
+.buildCoordinatorMetadataImage();
+long topicHash = computeTopicHash(topicName, metadataImage);
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMetadataHash(topicHash))
+.build();
+
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+
+// Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Assign partition 0 to member B. This is allowed even though 
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(13)
+.setPreviousMemberEpoch(12)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+.build()));
+
+// Verify partition epochs.
+assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+}
+
+@Test
+public void testConsumerGroupUnassignmentResolvesWithCompaction() {

Review Comment:
   The initial variant of this test had all the records replayed (the 
unassignment included) so it didn't actually check the compaction scenario with 
a removed unassignment record. Removing that replay gave the expected test 
behavior. I agree that we could perhaps remove the test though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565519871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from `bar` at epoch N - 1. `bar` is pending 
revocation due to the fix for KAFKA-19431. `Member A { epoch: N - 1, assigned 
partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. The epoch is bumped. `Member A { epoch: N, assigned 
partitions: [], pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.
   
   We should also document this in a test. It can be called something like 
`testReplayConsumerGroupCurrentMemberAssignmentSameEpochWithCompaction`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565594140


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,268 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testConsumerGroupAssignmentResolvesWithCompaction() {

Review Comment:
   `testReplayConsumerGroupCurrentMemberAssignmentWithCompaction` is probably a 
better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565519871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from `bar` at epoch N - 1. `bar` is pending 
revocation due to the fix for KAFKA-19431. `Member A { epoch: N - 1, assigned 
partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. The epoch is bumped. `Member A { epoch: N, assigned 
partitions: [], pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565519871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from `bar` at epoch N - 1. `bar` is pending 
revocation due to the fix for KAFKA-19431. `Member A { epoch: N - 1, assigned 
partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. `Member A { epoch: N, assigned partitions: [], 
pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565519871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from `bar` at epoch N - 1. `bar` is pending 
revocation due to KAFKA-19431. `Member A { epoch: N - 1, assigned partitions: 
[foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. `Member A { epoch: N, assigned partitions: [], 
pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565524884


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -288,22 +289,17 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
 
 // m2 should not be able to acquire foo-1 because the partition is
 // still owned by another member.
-assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+consumerGroup.updateMember(m2);
+assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)), 
+consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+);

Review Comment:
   ^ Ignore the above.
   
   We should however test that an `IllegalStateException` is thrown when the 
epoch goes backwards and the update is accepted when the epoch is the same.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from bar at epoch N - 1. `Member A { epoch: N - 1, 
assigned partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. `Member A { epoch: N, assigned partitions: [], 
pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-26 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2563766181


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-if (prevValue != null) {
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Can we bring back the rejection for replacing the epoch with an identical 
one?
   I don't think it should ever happen unless I'm missing something.
   ```suggestion
   if (prevValue == null || prevValue < epoch) {
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1057,14 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
-if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
-String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
-"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue != null && prevValue == expectedEpoch) {
+partitionsOrNull.remove(partitionId);
+} else {
+// GroupId added for context. 
+log.debug(
+String.format("[Group %s]: Cannot remove the 
epoch %d from %s-%s because the partition is " +
+"still owned at a different epoch %d", 
groupId, expectedEpoch, topicId, partitionId, prevValue));

Review Comment:
   * We can use the logger's string formatting capabilities.
   * Elsewhere, we prefix with "[GroupId {}] " ('Id' and no ':').
   
   The same for the other log messages.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1057,14 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
-if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
-String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
-"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+Integer prevValue = partitionsOrNull.get(partitionId);
+if (prevValue != null && prevValue == expectedEpoch) {
+partitionsOrNull.remove(partitionId);
+} else {
+// GroupId added for context. 

Review Comment:
   ```suggestion
   ```
   Can we drop this comment? The same for streams.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -288,22 +289,17 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
 
 // m2 should not be able to acquire foo-1 because the partition is
 // still owned by another member.
-assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+consumerGroup.updateMember(m2);
+assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)), 
+consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+);
 }
 
 @Test
 public void testRemovePartitionEpochs() {
 Uuid fooTopicId = Uuid.randomUuid();
 ConsumerGroup consumerGroup = createConsumerGroup("foo");
 
-// Removing should fail because there is no epoch set.
-assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
-mkAssignment(
-mkTopicAssignment(fooTopicId, 1)
-),
-

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-25 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2560381093


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1087,12 +1100,16 @@ void addPartitionEpochs(
 partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
 }
 for (Integer partitionId : assignedPartitions) {
-Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != null) {
-throw new IllegalStateException(
-String.format("Cannot set the epoch of %s-%s to %d 
because the partition is " +
-"still owned at epoch %d", topicId, 
partitionId, epoch, prevValue));
+if (prevValue > epoch) {

Review Comment:
   nit: Let's put the happy path first here too and we can also combine the two 
if conditions into one. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -942,11 +942,13 @@ private void removeTaskProcessIds(
 currentTasksProcessId.compute(subtopologyId, (__, 
partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.keySet().forEach(partitionId -> {
-String prevValue = 
partitionsOrNull.remove(partitionId);
+String prevValue = partitionsOrNull.get(partitionId);
 if (!Objects.equals(prevValue, expectedProcessId)) {

Review Comment:
   Same comments for this file.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1058,13 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != expectedEpoch) {

Review Comment:
   nit: Let's inverse the condition so we have the happy path first.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24387,6 +24387,263 @@ public void 
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
 assertDoesNotThrow(() -> context.replay(record));
 }
 
+@Test
+public void testConsumerGroupAssignmentResolvesWithCompaction() {
+String groupId = "fooup";
+String memberA = "memberA";
+String memberB = "memberB";
+
+Uuid topicId = Uuid.randomUuid();
+String topicName = "foo";
+
+CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+.addTopic(topicId, topicName, 2)
+.addRacks()
+.buildCoordinatorMetadataImage();
+long topicHash = computeTopicHash(topicName, metadataImage);
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMetadataHash(topicHash))
+.build();
+
+ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+
+// Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoch(11)
+.setPreviousMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Assign partition 0 to member B. This is allowed even though 
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberB)
+.setState(MemberState.STABLE)
+.setMemberEpoch(12)
+.setPreviousMemberEpoch(11)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+.build()));
+
+// Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberA)
+.setState(MemberState.STABLE)
+.setMemberEpoc

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-23 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2547611134


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1121,6 +1128,7 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 /**
  * Create a new consumer group according to the given classic group.
  *
+ * @param logContextThe LogContext.

Review Comment:
   ```suggestion
* @param logContextThe log context.
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1089,7 +1096,7 @@ void addPartitionEpochs(
 for (Integer partitionId : assignedPartitions) {
 Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);

Review Comment:
   I think compaction only allows records to be dropped and not reordered (they 
need to keep their offsets!). So we should only replace a non-null epoch with a 
larger one here and it makes sense to add a check to throw an 
IllegalStateException otherwise.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -918,7 +918,7 @@ void removeTaskProcessIds(
  *
  * @param assignmentThe assignment.
  * @param expectedProcessId The expected process ID.
- * @throws IllegalStateException if the process ID does not match the 
expected one. package-private for testing.
+ * @throws IllegalStateException if the process ID does not exist. 
package-private for testing.

Review Comment:
   ```suggestion
* package-private for testing.
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -103,6 +105,8 @@ public String toLowerCaseString() {
 }
 }
 
+private final Logger log;

Review Comment:
   ```suggestion
   /**
* The logger.
*/
   private final Logger log;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2550372081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1089,7 +1096,7 @@ void addPartitionEpochs(
 for (Integer partitionId : assignedPartitions) {
 Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);

Review Comment:
   Now that you mention it, I think it could be good to at least update the 
logic to be more in line with the `remove` methods -so `get` the value first 
then only `putting` it if we don't hit the check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2550372081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1089,7 +1096,7 @@ void addPartitionEpochs(
 for (Integer partitionId : assignedPartitions) {
 Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);

Review Comment:
   Now that you mention it, I think it would be good to at least update the 
logic to be more in line with the `remove` methods -so `get` the value first 
then only `putting` it if we don't hit the check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2550269866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1053,13 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
+log.warn(

Review Comment:
   Having `[GroupCoordinator id=%d]` is great but I still believe that having 
the group id is also needed in order to have a full context. In 
`GroupMetadataManager`, we usually add `[GroupId {}] ` after `[GroupCoordinator 
id=%d]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2550099444


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1053,13 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
+log.warn(

Review Comment:
   Logging the messages as debug is fine by me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2550097548


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1053,13 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
+log.warn(

Review Comment:
   The LogContext gets passed from the GroupMetadataManager so the log messages 
should be prefixed with`[GroupCoordinator id=%d]` already. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-21 Thread via GitHub


dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2549727081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1048,11 +1053,13 @@ void removePartitionEpochs(
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
 assignedPartitions.forEach(partitionId -> {
-Integer prevValue = 
partitionsOrNull.remove(partitionId);
+Integer prevValue = partitionsOrNull.get(partitionId);
 if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
+log.warn(

Review Comment:
   I am not a fan of those warnings. I think that users will be confused by 
them and they will think that something is wrong even if it is actually 
expected. Should we log them as debug? I would also include the group id. The 
message is meaningless without the group id.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1089,7 +1096,7 @@ void addPartitionEpochs(
 for (Integer partitionId : assignedPartitions) {
 Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);

Review Comment:
   While we are here, I wonder if we need to strengthen the logic. At minimum, 
we could ensure that a member can only "acquire the lock" if is has an epoch 
larger than the previous one. I am not sure whether we could have records 
reordered to be in this situation though. @squah-confluent What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-19 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2543032165


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
 
 private final TimelineObject hasSubscriptionMetadataRecord;
 
+private final LogContext logContext;
+
+private final Logger log;
+
+public ConsumerGroup(
+LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {

Review Comment:
   I think the second constructor can just call the first unless it really 
needs to be removed? That way it's still not as redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-19 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2542895318


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
 
 private final TimelineObject hasSubscriptionMetadataRecord;
 
+private final LogContext logContext;
+
+private final Logger log;

Review Comment:
   nit: I'd move this to the top of the fields, before 
`TimelineObject state`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
 
 private final TimelineObject hasSubscriptionMetadataRecord;
 
+private final LogContext logContext;

Review Comment:
   We can drop this field since we never use it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
 
 private final TimelineObject hasSubscriptionMetadataRecord;
 
+private final LogContext logContext;
+
+private final Logger log;
+
+public ConsumerGroup(
+LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {

Review Comment:
   Can we remove the other constructor? In the tests we can fill in the extra 
parameter with `new LogContext()`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
 
 private final TimelineObject hasSubscriptionMetadataRecord;
 
+private final LogContext logContext;
+
+private final Logger log;
+
+public ConsumerGroup(
+LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+super(snapshotRegistry, groupId);
+this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
+this.classicProtocolMembersSupportedProtocols = new 
TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.subscribedRegularExpressions = new 
TimelineHashMap<>(snapshotRegistry, 0);
+this.resolvedRegularExpressions = new 
TimelineHashMap<>(snapshotRegistry, 0);
+this.hasSubscriptionMetadataRecord = new 
TimelineObject<>(snapshotRegistry, false);
+this.logContext = logContext;
+this.log = logContext.logger(ConsumerGroup.class);

Review Comment:
   nit: same, I'd move this to the top of the constructor, after super().



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24057,4 +24057,318 @@ private Map

Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-19 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2542815659


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1034,7 +1038,7 @@ private void maybeRemovePartitionEpoch(
  *
  * @param assignmentThe assignment.
  * @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * @throws IllegalStateException if the epoch does not exist.

Review Comment:
   I'm referring to the remaining IllegalStateException case `Cannot remove the 
epoch %d from %s because it does not have any epoch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-19 Thread via GitHub


izzyharker commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2542651472


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1034,7 +1038,7 @@ private void maybeRemovePartitionEpoch(
  *
  * @param assignmentThe assignment.
  * @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * @throws IllegalStateException if the epoch does not exist.

Review Comment:
   Did you mean hitting the same IllegalStateException or the second check in 
the method? I made a second test with that order for the ConsumerGroup and 
didn't have any more issues. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] KAFKA-19862: Group coordinator loading may fail when there is concurrent compaction [kafka]

2025-11-17 Thread via GitHub


squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2535873655


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -77,6 +79,8 @@
  */
 public class ConsumerGroup extends ModernGroup {
 
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerGroup.class);

Review Comment:
   We have to use the `LogContext` from the `GroupMetadataManager`, otherwise 
we won't prefix log lines with `[GroupCoordinator id=%d]`. When adding 
`LogContext` to the constructor, it should go as the first parameter.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -1047,7 +1051,7 @@ void removePartitionEpochs(
 assignedPartitions.forEach(partitionId -> {
 Integer prevValue = 
partitionsOrNull.remove(partitionId);
 if (prevValue != expectedEpoch) {
-throw new IllegalStateException(
+log.warn(

Review Comment:
   When we hit this case we must not have removed the entry from the map. We 
can replace the `remove` above with a `get` and `remove` if we don't hit this 
check.
   
   The same for streams



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -24057,4 +24057,196 @@ private Map