dajac commented on code in PR #17008:
URL: https://github.com/apache/kafka/pull/17008#discussion_r1732747430


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2047,6 +2061,10 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
                 scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), 
request.rebalanceTimeoutMs());
 
                 responseFuture.complete(response);
+
+                // Maybe downgrade the consumer group if the last member using 
the
+                // consumer protocol is replaced by the joining member.
+                scheduleConsumerGroupDowngradeTimeout(groupId);

Review Comment:
   I wonder if we could avoid scheduling the async tasks if it is not 
necessary. For instance, if the downgrade is disabled, we don't need it, etc. 
Is it worth it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1046,43 +1046,58 @@ ShareGroup shareGroup(
     }
 
     /**
-     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     * Validates whether the group id is eligible for an online downgrade.
      *
-     * @param consumerGroup The ConsumerGroup.
-     * @param memberId      The fenced member id.
+     * @param consumerGroup The group to downgrade.
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
-    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) {
+        if (!consumerGroup.allMembersUseClassic()) {
             return false;
-        } else if (consumerGroup.numMembers() <= 1) {
+        } else if (consumerGroup.isEmpty()) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
                 consumerGroup.groupId());
             return false;
         } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
             log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
                 consumerGroup.groupId());
             return false;
-        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+        } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
             log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
                 consumerGroup.groupId());
             return false;
         }
         return true;
     }
 
+    /**
+     * Maybe downgrade the consumer group to a classic group if it's valid for 
online downgrade.
+     *
+     * @param groupId   The group id.
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupDowngradeOperation(
+        String groupId
+    ) {
+        try {
+            ConsumerGroup consumerGroup = consumerGroup(groupId);
+            if (validateOnlineDowngrade(consumerGroup)) {
+                return convertToClassicGroup(consumerGroup);
+            }
+        } catch (GroupIdNotFoundException e) {
+            log.info("Cannot downgrade group {} because the group doesn't 
exist or it's not a consumer group.");

Review Comment:
   nit: Should we log in debug?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3126,6 +3147,23 @@ private void cancelConsumerGroupSyncTimeout(
         timer.cancel(consumerGroupSyncKey(groupId, memberId));
     }
 
+    /**
+     * Schedules the downgrade timeout for the consumer group.
+     *
+     * @param groupId The group id to downgrade.
+     */
+    private void scheduleConsumerGroupDowngradeTimeout(
+        String groupId
+    ) {
+        timer.schedule(
+            consumerGroupDowngradeKey(groupId),

Review Comment:
   Should we use `scheduleIfAbsent`? `scheduleConsumerGroupDowngradeTimeout` is 
called in multiple places and we only need it once.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1046,43 +1046,58 @@ ShareGroup shareGroup(
     }
 
     /**
-     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     * Validates whether the group id is eligible for an online downgrade.
      *
-     * @param consumerGroup The ConsumerGroup.
-     * @param memberId      The fenced member id.
+     * @param consumerGroup The group to downgrade.
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
-    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) {
+        if (!consumerGroup.allMembersUseClassic()) {
             return false;
-        } else if (consumerGroup.numMembers() <= 1) {
+        } else if (consumerGroup.isEmpty()) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
                 consumerGroup.groupId());
             return false;
         } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
             log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
                 consumerGroup.groupId());
             return false;
-        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+        } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
             log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
                 consumerGroup.groupId());
             return false;
         }
         return true;
     }
 
+    /**
+     * Maybe downgrade the consumer group to a classic group if it's valid for 
online downgrade.
+     *
+     * @param groupId   The group id.
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupDowngradeOperation(
+        String groupId
+    ) {
+        try {
+            ConsumerGroup consumerGroup = consumerGroup(groupId);
+            if (validateOnlineDowngrade(consumerGroup)) {
+                return convertToClassicGroup(consumerGroup);

Review Comment:
   In the non static member case, I wonder if we actually trigger two 
rebalances now. When a member leaves, the group epoch is bumped when we fence 
it. This triggers a rebalance. Then, we the group is converted, we trigger 
another one. Is my understanding correct?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1046,43 +1046,58 @@ ShareGroup shareGroup(
     }
 
     /**
-     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     * Validates whether the group id is eligible for an online downgrade.
      *
-     * @param consumerGroup The ConsumerGroup.
-     * @param memberId      The fenced member id.
+     * @param consumerGroup The group to downgrade.
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
-    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) {
+        if (!consumerGroup.allMembersUseClassic()) {
             return false;
-        } else if (consumerGroup.numMembers() <= 1) {
+        } else if (consumerGroup.isEmpty()) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
                 consumerGroup.groupId());
             return false;
         } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
             log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
                 consumerGroup.groupId());
             return false;
-        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+        } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
             log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
                 consumerGroup.groupId());
             return false;
         }
         return true;
     }
 
+    /**
+     * Maybe downgrade the consumer group to a classic group if it's valid for 
online downgrade.
+     *
+     * @param groupId   The group id.
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupDowngradeOperation(
+        String groupId
+    ) {
+        try {
+            ConsumerGroup consumerGroup = consumerGroup(groupId);
+            if (validateOnlineDowngrade(consumerGroup)) {
+                return convertToClassicGroup(consumerGroup);

Review Comment:
   `convertToClassicGroup` always triggers a rebalance. Do we still need it? 
When the last static member is replaced, it does not seem necessary. We may 
need to check the state to see whether the group is stable or rebalancing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to