dajac commented on code in PR #18024:
URL: https://github.com/apache/kafka/pull/18024#discussion_r1868876963


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -608,9 +608,12 @@ private void updatePatternSubscription(Cluster cluster) {
         if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
             this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
 
-            // Join the group if not already part of it, or just send the new 
subscription to the broker on the next poll.
-            
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
         }
+        // Join the group if not already part of it, or just send the updated 
subscription
+        // to the broker on the next poll. Note that this is done even if no 
topics matched
+        // the regex, to ensure the member joins the group if needed (with 
empty subscription).
+        
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -334,6 +335,31 @@ public void testTopicPatternSubscriptionChangeEvent() {
         assertDoesNotThrow(() -> event.future().get());
     }
 
+    @Test
+    public void testTopicPatternSubscriptionTriggersJoin() {
+        TopicPatternSubscriptionChangeEvent event = new 
TopicPatternSubscriptionChangeEvent(
+            Pattern.compile("topic.*"), Optional.of(new 
MockRebalanceListener()), 12345);
+        setupProcessor(true);
+        Cluster cluster = mock(Cluster.class);
+        when(metadata.fetch()).thenReturn(cluster);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+
+        // Initial subscription where no topics match the pattern. Membership 
manager
+        // should still be notified so it joins if not in the group (with 
empty subscription).
+        when(subscriptionState.subscribeFromPattern(any())).thenReturn(false);
+        processor.process(event);
+        verify(membershipManager).onSubscriptionUpdated();
+
+        clearInvocations(membershipManager);
+
+        // Subscription where some topics match so subscription is updated. 
Membership manager
+        // should be notified so it joins if not in the group.
+        when(subscriptionState.subscribeFromPattern(any())).thenReturn(true);
+        processor.process(event);
+        verify(membershipManager).onSubscriptionUpdated();
+

Review Comment:
   nit: We could remove this empty line too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to