squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2563766181
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
- Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
- if (prevValue != null) {
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue == null || prevValue <= epoch) {
Review Comment:
Can we bring back the rejection for replacing the epoch with an identical
one?
I don't think it should ever happen unless I'm missing something.
```suggestion
if (prevValue == null || prevValue < epoch) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1048,11 +1057,14 @@ void removePartitionEpochs(
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
- if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
- "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue != null && prevValue == expectedEpoch) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ // GroupId added for context.
+ log.debug(
+ String.format("[Group %s]: Cannot remove the
epoch %d from %s-%s because the partition is " +
+ "still owned at a different epoch %d",
groupId, expectedEpoch, topicId, partitionId, prevValue));
Review Comment:
* We can use the logger's string formatting capabilities.
* Elsewhere, we prefix with "[GroupId {}] " ('Id' and no ':').
The same for the other log messages.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1048,11 +1057,14 @@ void removePartitionEpochs(
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
- if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
- "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue != null && prevValue == expectedEpoch) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ // GroupId added for context.
Review Comment:
```suggestion
```
Can we drop this comment? The same for streams.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -288,22 +289,17 @@ public void
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
// m2 should not be able to acquire foo-1 because the partition is
// still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ consumerGroup.updateMember(m2);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)),
+ consumerGroup.getOrMaybeCreateMember("m1",
false).assignedPartitions()
+ );
}
@Test
public void testRemovePartitionEpochs() {
Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
- mkAssignment(
- mkTopicAssignment(fooTopicId, 1)
- ),
- 10
- ));
-
Review Comment:
Can we keep this? It just won't throw.
```
// Removing should be a no-op when there is no epoch set.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -312,13 +308,19 @@ public void testRemovePartitionEpochs() {
consumerGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing with incorrect epoch should fail.
Review Comment:
```suggestion
// Removing with incorrect epoch should do nothing.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -375,24 +375,20 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
String fooSubtopologyId = "foo-sub";
StreamsGroup streamsGroup = createStreamsGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
- TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
- "process"
- ));
-
Review Comment:
Can we keep this?
```
// Removing should be a no-op when there is no epoch set.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash, barTopicName, barTopicHash))))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-1.
+ // 2. Member A is unassigned partition foo-1 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-1.
+ // 4. Member B is unassigned partition foo-1.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // We would like to not fail in these cases and allow both the
assignment of member B to foo-1 and
+ // member A to bar-0 to succeed because the epochs are larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1, 2)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
2)))
Review Comment:
We need to clear out all `foo` epochs here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
Review Comment:
How about
```suggestion
public void testConsumerGroupPartitionEpochsWithCompaction() {
```
The same for streams.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -288,22 +289,17 @@ public void
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
// m2 should not be able to acquire foo-1 because the partition is
// still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ consumerGroup.updateMember(m2);
+ assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)),
+ consumerGroup.getOrMaybeCreateMember("m1",
false).assignedPartitions()
+ );
Review Comment:
This member update ought to be rejected (throw an IllegalStateException like
previously expected).
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -375,24 +375,20 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
String fooSubtopologyId = "foo-sub";
StreamsGroup streamsGroup = createStreamsGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
- TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
- "process"
- ));
-
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
.setProcessId("process")
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole,
10, mkTasks(fooSubtopologyId, 1)))
.build();
streamsGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ // Removing with incorrect epoch should fail.
+ // A debug message is logged but no exception is thrown.
+ streamsGroup.removeTaskProcessIds(
TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
"process1"
- ));
+ );
+ assertEquals(m1.assignedTasks(),
streamsGroup.getMemberOrThrow("m1").assignedTasks());
Review Comment:
We're missing an assert for the epoch itself.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
Review Comment:
We can remove most of this setup and let the `replay` method create the
group. The group lookup will have to be moved to after the `replay`s.
The same for streams.
```suggestion
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId, topicName, 2)
.buildCoordinatorMetadataImage();
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withMetadataImage(metadataImage)
.build();
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
Review Comment:
```suggestion
// Partition 0's owner is replaced with member B at epoch 12.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
```
+equivalent comment for the streams version.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -375,24 +375,20 @@ public void testRemoveTaskProcessIds(TaskRole taskRole) {
String fooSubtopologyId = "foo-sub";
StreamsGroup streamsGroup = createStreamsGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
- TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
- "process"
- ));
-
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
.setProcessId("process")
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole,
10, mkTasks(fooSubtopologyId, 1)))
.build();
streamsGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ // Removing with incorrect epoch should fail.
Review Comment:
```suggestion
// Removing with incorrect epoch should do nothing.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,268 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
Review Comment:
```suggestion
.build()));
// Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
```
+equivalent comment for the streams version.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
Review Comment:
I don't think the test setup is quite right. The original intention was to
exercise the branch of removePartitionEpochs where the topic has no epochs at
all. Seeing as we already test that in ConsumerGroupTest (or will once the
other review comments are resolved), perhaps we can remove this test case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]