Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-15 Thread via GitHub


dongnuo123 closed pull request #15442: KAFKA-16313: offline group protocol 
migration
URL: https://github.com/apache/kafka/pull/15442


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-08 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1517690579


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVers

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516858539


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), Metadat

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516784744


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), Metadat

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516784744


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), Metadat

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516473475


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVers

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1516453885


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9607,6 +9607,151 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testMaybeUpgradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+// A consumer group can't be upgraded.
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(EMPTY);
+context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, 
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
 records);
+}
+
+@Test
+public void testMaybeDowngradeEmptyGroup() {
+String classicGroupId = "classic-group-id";
+String consumerGroupId = "consumer-group-id";
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+List records = new ArrayList<>();
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// A classic group can't be downgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+
+// An empty consumer group can be upgraded.
+context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, 
records);
+assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), 
records);
+records.clear();
+
+// A non-empty consumer group can't be downgraded.
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder.build()));
+context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, 
records);
+assertEquals(Collections.emptyList(), records);
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), Metadat

Re: [PR] KAFKA-16313: offline group protocol migration [kafka]

2024-03-07 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1515850376


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CLASSIC && group.isEmpty();
+}
+
+/**
+ * A group can be downgraded offline if it's a consumer group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline downgrade is valid.
+ */
+private boolean validateOfflineDowngrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CONSUMER && group.isEmpty();
+}
+
+/**
+ * Upgrade the empty group if it's valid.
+ *
+ * @param groupId The group id to be migrated.
+ * @param records The list of records to delete the previous group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records) {
+if (validateOfflineUpgrade(groupId)) {
+deleteGroup(groupId, records);
+removeGroup(groupId);

Review Comment:
   nit: Those two methods next to each others look a bit weird. I wonder if we 
should rename `deleteGroup` to `createGroupTombstoneRecords`.
   
   Another thing is that we actually get the group from the map in each method 
called here. I wonder if we should inline and simplify it. For instance, we 
could lookup the group as the first thing in this method, check the condition, 
then call `createGroupTombstoneRecords` on the group and finally remove the 
group from the map. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CLASSIC && group.isEmpty();
+}
+
+/**
+ * A group can be downgraded offline if it's a consumer group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline downgrade is valid.
+ */
+private boolean validateOfflineDowngrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CONSUMER && group.isEmpty();
+}
+
+/**
+ * Upgrade the empty group if it's valid.
+ *
+ * @param groupId The group id to be migrated.
+ * @param records The list of records to delete the previous group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records) {
+if (validateOfflineUpgrade(groupId)) {

Review Comment:
   nit: I wonder if naming it `isEmptyClassicGroup` would be better.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CLASSIC && group.isEmpty();
+}
+
+/**
+ * A group can be downgraded offline if it's a consumer group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline downgrade is valid.
+ */
+private boolean validateOfflineDowngrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CONSUMER && group.isEmpty();
+}
+
+/**
+ * Upgrade the empty group if it's valid.
+ *
+ * @param groupId The group id to be migrated.
+ * @param records The list of records to delete the previous group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records) {

Review Comment:
   nit: Could we keep it private? It does not seems necessary to expose it. I 
also wonder if we should name it `maybeDeleteEmptyClassicGroup`. What do you 
think?



##
group-coordinator/src/test/ja