squah-confluent commented on code in PR #21663:
URL: https://github.com/apache/kafka/pull/21663#discussion_r2905083993


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1311,12 +1312,14 @@ private void convertToClassicGroup(
 
         classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
 
-        // If the downgrade is triggered by a member leaving the group or a 
static
-        // member replacement with a different subscription, a rebalance 
should be triggered.
+        // If the downgrade is triggered by a member leaving the group or the
+        // assignment is stale, a rebalance should be triggered.
+        // When the downgrade is triggered by static member replacement,
+        // the group epoch is bumped so the assignment must be stale.
         if (joiningMember == null) {
             prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for member leaving.", classicGroup.groupId()));
-        } else if (hasSubscriptionChanged) {
-            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for static member replacement with different 
subscription.", classicGroup.groupId()));
+        } else if (targetAssignmentEpoch < groupEpoch) {

Review Comment:
   Done, I moved the logic up a level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to