jeffkbkim commented on code in PR #17976:
URL: https://github.com/apache/kafka/pull/17976#discussion_r1868182101


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3038,6 +3042,59 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Maybe delete the resolved regular expression associated to the provided 
member if
+     * it was the last subscribed member to it.
+     *
+     * @param records   The record accumulator.
+     * @param group     The group.
+     * @param member    The member removed from the group.
+     */
+    private void maybeDeleteResolvedRegularExpression(
+        List<CoordinatorRecord> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member
+    ) {
+        if (isNotEmpty(member.subscribedTopicRegex()) && 
group.numSubscribedMembers(member.subscribedTopicRegex()) == 1) {
+            records.add(newConsumerGroupRegularExpressionTombstone(
+                group.groupId(),
+                member.subscribedTopicRegex()
+            ));
+        }
+    }
+
+    /**
+     * Maybe delete the resolved regular expressions associated with the 
provided members
+     * if they were the last ones subscribed to them.
+     *
+     * @param records   The record accumulator.
+     * @param group     The group.
+     * @param members   The member removed from the group.
+     * @return The set of deleted regular expressions.
+     */
+    private Set<String> maybeDeleteResolvedRegularExpressions(

Review Comment:
   should we add a test for this? or include in another test where a single 
member is subscribed to topic "foo" as both a topic name and as a regex, i.e. 
"foo" and "foo*"



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -655,24 +722,28 @@ public void createGroupTombstoneRecordsWithReplacedMember(
         String leavingMemberId,
         String joiningMemberId
     ) {
-        members().forEach((memberId, __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 removedMemberId));
         });
 
-        members().forEach((memberId, __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 removedMemberId));
         });
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId));
 
-        members().forEach((memberId,  __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 removedMemberId));
         });
 
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+        resolvedRegularExpressions.keySet().forEach(regex ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 regex))
+        );
+
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId));

Review Comment:
    i recall we no longer should care about ordering of record types. is this 
still the case?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3224,6 +3228,59 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Maybe delete the resolved regular expression associated to the provided 
member if
+     * it was the last subscribed member to it.
+     *
+     * @param records   The record accumulator.
+     * @param group     The group.
+     * @param member    The member removed from the group.
+     */
+    private void maybeDeleteResolvedRegularExpression(

Review Comment:
   can we share implementations in these two methods?



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -660,6 +660,24 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     )
   }
 
+  protected def classicLeaveGroup(

Review Comment:
   what's the difference between this and leaveGroupWithOldProtocol except for 
the expected response?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to