dajac commented on code in PR #17008: URL: https://github.com/apache/kafka/pull/17008#discussion_r1732747430
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2047,6 +2061,10 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs()); responseFuture.complete(response); + + // Maybe downgrade the consumer group if the last member using the + // consumer protocol is replaced by the joining member. + scheduleConsumerGroupDowngradeTimeout(groupId); Review Comment: I wonder if we could avoid scheduling the async tasks if it is not necessary. For instance, if the downgrade is disabled, we don't need it, etc. Is it worth it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1046,43 +1046,58 @@ ShareGroup shareGroup( } /** - * Validates the online downgrade if a consumer member is fenced from the consumer group. + * Validates whether the group id is eligible for an online downgrade. * - * @param consumerGroup The ConsumerGroup. - * @param memberId The fenced member id. + * @param consumerGroup The group to downgrade. * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ - private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) { + if (!consumerGroup.allMembersUseClassic()) { return false; - } else if (consumerGroup.numMembers() <= 1) { + } else if (consumerGroup.isEmpty()) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId()); return false; - } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + } else if (consumerGroup.numMembers() > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); return false; } return true; } + /** + * Maybe downgrade the consumer group to a classic group if it's valid for online downgrade. + * + * @param groupId The group id. + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupDowngradeOperation( + String groupId + ) { + try { + ConsumerGroup consumerGroup = consumerGroup(groupId); + if (validateOnlineDowngrade(consumerGroup)) { + return convertToClassicGroup(consumerGroup); + } + } catch (GroupIdNotFoundException e) { + log.info("Cannot downgrade group {} because the group doesn't exist or it's not a consumer group."); Review Comment: nit: Should we log in debug? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3126,6 +3147,23 @@ private void cancelConsumerGroupSyncTimeout( timer.cancel(consumerGroupSyncKey(groupId, memberId)); } + /** + * Schedules the downgrade timeout for the consumer group. + * + * @param groupId The group id to downgrade. + */ + private void scheduleConsumerGroupDowngradeTimeout( + String groupId + ) { + timer.schedule( + consumerGroupDowngradeKey(groupId), Review Comment: Should we use `scheduleIfAbsent`? `scheduleConsumerGroupDowngradeTimeout` is called in multiple places and we only need it once. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1046,43 +1046,58 @@ ShareGroup shareGroup( } /** - * Validates the online downgrade if a consumer member is fenced from the consumer group. + * Validates whether the group id is eligible for an online downgrade. * - * @param consumerGroup The ConsumerGroup. - * @param memberId The fenced member id. + * @param consumerGroup The group to downgrade. * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ - private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) { + if (!consumerGroup.allMembersUseClassic()) { return false; - } else if (consumerGroup.numMembers() <= 1) { + } else if (consumerGroup.isEmpty()) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId()); return false; - } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + } else if (consumerGroup.numMembers() > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); return false; } return true; } + /** + * Maybe downgrade the consumer group to a classic group if it's valid for online downgrade. + * + * @param groupId The group id. + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupDowngradeOperation( + String groupId + ) { + try { + ConsumerGroup consumerGroup = consumerGroup(groupId); + if (validateOnlineDowngrade(consumerGroup)) { + return convertToClassicGroup(consumerGroup); Review Comment: In the non static member case, I wonder if we actually trigger two rebalances now. When a member leaves, the group epoch is bumped when we fence it. This triggers a rebalance. Then, we the group is converted, we trigger another one. Is my understanding correct? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1046,43 +1046,58 @@ ShareGroup shareGroup( } /** - * Validates the online downgrade if a consumer member is fenced from the consumer group. + * Validates whether the group id is eligible for an online downgrade. * - * @param consumerGroup The ConsumerGroup. - * @param memberId The fenced member id. + * @param consumerGroup The group to downgrade. * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ - private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) { + if (!consumerGroup.allMembersUseClassic()) { return false; - } else if (consumerGroup.numMembers() <= 1) { + } else if (consumerGroup.isEmpty()) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId()); return false; - } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + } else if (consumerGroup.numMembers() > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); return false; } return true; } + /** + * Maybe downgrade the consumer group to a classic group if it's valid for online downgrade. + * + * @param groupId The group id. + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupDowngradeOperation( + String groupId + ) { + try { + ConsumerGroup consumerGroup = consumerGroup(groupId); + if (validateOnlineDowngrade(consumerGroup)) { + return convertToClassicGroup(consumerGroup); Review Comment: `convertToClassicGroup` always triggers a rebalance. Do we still need it? When the last static member is replaced, it does not seem necessary. We may need to check the state to see whether the group is stable or rebalancing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org