dajac commented on code in PR #18024: URL: https://github.com/apache/kafka/pull/18024#discussion_r1868876963
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -608,9 +608,12 @@ private void updatePatternSubscription(Cluster cluster) { if (subscriptions.subscribeFromPattern(topicsToSubscribe)) { this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); - // Join the group if not already part of it, or just send the new subscription to the broker on the next poll. - requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); } + // Join the group if not already part of it, or just send the updated subscription + // to the broker on the next poll. Note that this is done even if no topics matched + // the regex, to ensure the member joins the group if needed (with empty subscription). + requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); + Review Comment: nit: We could remove this empty line. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -334,6 +335,31 @@ public void testTopicPatternSubscriptionChangeEvent() { assertDoesNotThrow(() -> event.future().get()); } + @Test + public void testTopicPatternSubscriptionTriggersJoin() { + TopicPatternSubscriptionChangeEvent event = new TopicPatternSubscriptionChangeEvent( + Pattern.compile("topic.*"), Optional.of(new MockRebalanceListener()), 12345); + setupProcessor(true); + Cluster cluster = mock(Cluster.class); + when(metadata.fetch()).thenReturn(cluster); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + + // Initial subscription where no topics match the pattern. Membership manager + // should still be notified so it joins if not in the group (with empty subscription). + when(subscriptionState.subscribeFromPattern(any())).thenReturn(false); + processor.process(event); + verify(membershipManager).onSubscriptionUpdated(); + + clearInvocations(membershipManager); + + // Subscription where some topics match so subscription is updated. Membership manager + // should be notified so it joins if not in the group. + when(subscriptionState.subscribeFromPattern(any())).thenReturn(true); + processor.process(event); + verify(membershipManager).onSubscriptionUpdated(); + Review Comment: nit: We could remove this empty line too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org