Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-24 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1577407602


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##
@@ -198,6 +207,43 @@ private static void assertApiMessageAndVersionEquals(
 }
 }
 }
+} else if (actual.message() instanceof GroupMetadataValue) {
+GroupMetadataValue expectedValue = (GroupMetadataValue) 
expected.message().duplicate();
+GroupMetadataValue actualValue = (GroupMetadataValue) 
actual.message().duplicate();
+
+
expectedValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
+
actualValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
+try {
+Arrays.asList(expectedValue, actualValue).forEach(value ->
+value.members().forEach(memberMetadata -> {
+// Sort topics and ownedPartitions in Subscription.
+ConsumerPartitionAssignor.Subscription subscription =
+
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
+subscription.topics().sort(String::compareTo);
+subscription.ownedPartitions().sort(
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+);
+
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
+subscription,
+
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
+)));
+
+// Sort partitions in Assignment.
+ConsumerPartitionAssignor.Assignment assignment =
+
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
+assignment.partitions().sort(
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+);
+
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+assignment,
+
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
+)));
+})
+);
+} catch (SchemaException ex) {
+System.out.println("Failed deserialization: " + 
ex.getMessage());

Review Comment:
   nit: Sorry, I missed this one. We usually don't keep `println` in tests. It 
may be better to call `fail` with an appropriate message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1576113492


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1422,10 +1499,16 @@ private 
CoordinatorResult consumerGr
 ) throws ApiException {
 ConsumerGroup group = consumerGroup(groupId);
 List records;
+CompletableFuture appendFuture = null;
 if (instanceId == null) {
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
-records = consumerGroupFenceMember(group, member);
+if (validateOnlineDowngrade(group, memberId)) {
+records = new ArrayList<>();
+appendFuture = convertToClassicGroup(group, memberId, records);
+} else {
+records = consumerGroupFenceMember(group, member);
+}

Review Comment:
   Having this duplicated code in multiple places is a tad annoying, don't you 
think? I wonder if we could push it down into `consumerGroupFenceMember` as it 
seems that the downgrade must be checked whenever `consumerGroupFenceMember` is 
called. `consumerGroupFenceMember` could return a future or null and take the 
records as an argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-22 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1574897141


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10331,6 +10333,461 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
 .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
 }
 
+@Test
+public void testLastClassicProtocolMemberLeavingConsumerGroup() {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+Uuid zarTopicId = Uuid.randomUuid();
+String zarTopicName = "zar";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+Arrays.asList(fooTopicName, barTopicName),
+null,
+Arrays.asList(
+new TopicPartition(fooTopicName, 0),
+new TopicPartition(fooTopicName, 1),
+new TopicPartition(fooTopicName, 2),
+new TopicPartition(barTopicName, 0),
+new TopicPartition(barTopicName, 1)
+)
+
+);
+
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(protocols))
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.build();
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+// Use zar only here to ensure that metadata needs to be 
recomputed.
+.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.build();
+
+// Consumer group with two members.
+// Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addTopic(barTopicId, barTopicName, 3)
+.addTopic(zarTopicId, zarTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(member1)
+.withMember(member2)
+.withAssignment(memberId1, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.withAssignment(memberId2, mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.withAssignmentEpoch(10))
+.build();
+
+context.commit();
+ConsumerGroup consumerGroup = 
context.groupMetadataManager.consumerGroup(groupId);
+
+// Member 2 leaves the consumer group, triggering the downgrade.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId2)
+.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+   

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571520271


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##
@@ -198,6 +204,62 @@ private static void assertApiMessageAndVersionEquals(
 }
 }
 }
+} else if (actual.message() instanceof GroupMetadataValue) {

Review Comment:
   Yeah it's more convenient. Let me change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571469219


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {

Review Comment:
   For explicitly leaving group we know the remaining one is using the consumer 
group protocol. I'm not sure about the timeout cases. It could be an expired 
classic protocol member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571079283


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+
+ClassicGroup classicGroup;
+try {
+classicGroup = ClassicGroup.fromConsumerGroup(
+consumerGroup,
+leavingMemberId,
+logContext,
+time,
+metrics,
+consumerGroupSessionTimeoutMs,
+metadataImage
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(),
 records);
+
+removeGroup(consumerGroup.groupId());
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group %s.", 
classicGroup.groupId()));
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {

Review Comment:
   I wonder if we could use `exceptionally` here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its 

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569413973


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);

Review Comment:
   Ah yes. Thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569405350


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   Yeah that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);

Review Comment:
   I think that we should explicitly remove the previous group before adding 
the new one because we update metrics when the previous group is removed. We 
could likely call `removeGroup` for this purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569055496


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap
+ 

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569051446


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   I do agree with the `appendFuture` part of your explanation. However, I 
still believe that we should schedule the session timeouts and the start the 
rebalance before it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   If creating all the state immediately, we'll get error when replaying the 
old ConsumerGroup tombstone because `group.get(groupId)` has become a 
ClassicGroup. We can't rely on replaying the records to update the states 
either, because we need the new classicGroup reference to trigger the 
rebalance. so the only way is not to replay the records by setting the 
appendFuture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568963034


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   Yes we should revert `onClassicGroupStateTransition`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   If creating all the state immediately, we'll get error when replaying the 
old ConsumerGroup tombstone because `group.get(groupId)` has become a 
ClassicGroup. We can't rely on replaying the records to update the states 
either, because we need the new classicGroup reference to trigger the 
rebalance. so the only way is not to replay the records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568737120


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {

Review Comment:
   Does it need to be public? Let's add some javadoc please.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9623,6 +10332,186 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
 .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testLastClassicProtocolMemberLeavingConsumerGroup(boolean 
appendLogSuccessfully) {

Review Comment:
   Should we also test the session expiration path and the rebalance expiration 
path?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+

[PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dongnuo123 opened a new pull request, #15721:
URL: https://github.com/apache/kafka/pull/15721

   Online downgrade from a consumer group to a classic group is triggered when 
the last consumer that uses the consumer protocol leaves the group. A rebalance 
is manually triggered after the group conversion. 
   
   This patch adds consumer group downgrade validation and conversion.
   
   https://issues.apache.org/jira/browse/KAFKA-16554
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org