Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
cadonna merged PR #15585: URL: https://github.com/apache/kafka/pull/15585 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045768269 I created the follow-up task https://issues.apache.org/jira/browse/KAFKA-16493 to address that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
dajac commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045520630 @cadonna Yes, we can merge it and do a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
cadonna commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045503262 Could we merge this PR as it is and then fix the issue @dajac in a separate PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045483420 Good catch @dajac , I missed that. Checking the metadata version in the `maybeUpdateSubscriptionMetadata` before we do the actual pattern update would definitely save some unneeded regex checks, just like the legacy does [here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L437), makes total sense to me. What do you think @Phuc-Hong-Tran? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
dajac commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045420088 @lianetm Thanks. So it seems that we call it on every poll. Aren't we concerned about the cost of matching the regex? In the legacy code, I think that we have a mechanism to refresh it only if the version of the metadata has changed. Do we have something similar here? Do we need it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045382837 Hey @dajac , we do have the same mechanism, calling `updatePatternSubscription` on a regular basis, as part of the consumer poll loop [here](https://github.com/apache/kafka/blob/4307840f100cd5dab18c7073aa2eaf060a997a92/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L723C17-L723C49). This ensures that is the set of topics that mathches the regex changes (topics creates or deleted), the subscription is updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
dajac commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045351051 Thanks for working on this one. I have one question regarding the implementation. In the legacy consumer, we have a mechanism to refresh the subscribed topics based on the regex. This is done to catch new topics and deleted topics. Do we have a similar mechanism in the new consumer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
cadonna commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045332239 I restarted the build since the Java 11 build crashed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
kirktrue commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1557728693 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556146632 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556142547 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
kirktrue commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556133511 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); -if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { -data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); -sentFields.subscribedTopicNames = subscribedTopicNames; -} -} else { -// SubscribedTopicRegex - only sent if it has changed since the last heartbeat -// - not supported yet +// SubscribedTopicNames - only sent if has changed since the last heartbeat +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; Review Comment: Nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2043179367 @cadonna could you take a look at this one when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556104853 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2042958530 Hey @Phuc-Hong-Tran, thanks for the update, left some more comments. Almost there! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555969469 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1751,16 +1753,7 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555915666 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1669,7 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +maybeUpdateSubscriptionMetadata(); backgroundEventProcessor.process(); // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as Review Comment: Since we're here, and adding logic that adds to the purpose of this `updateAssignmentMetadataIfNeeded`, we could clean up and remove this outdated comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555909363 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala: ## @@ -39,9 +39,8 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { * metadata refresh the consumer becomes subscribed to this new topic and all partitions * of that topic are assigned to it. */ - // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) Review Comment: If we don't use the `getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly` function anymore we should remove its definition from this file now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1554474003 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1554466415 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1550279550 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/pull/15585#issuecomment-2021526537) that made you introduce the while loop: > there is a race condition bug where the metadata is not updated but the heartbeat request is already created, but it lacks required info -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2035194359 Hey @Phuc-Hong-Tran, thanks for the updates! Left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1550178486 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466) it means that, with the latest metadata, we discovered new topics that made the subscription change. So only at that point we need to create the `SubscriptionChangeEvent` I would say (and yes, then we also call `requestUpdateForNewTopics`) So echoing my first comment on this thread, seems to me that we shouldn't rely on any metadata object/version check here as it could not be accurate (removing [this](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1757-L1761)), and we should just make sure that we send the `SubscriptionChangeEvent` only when we know that the subscription changed, which is inside the `updatePatternSubscription` [if](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1465). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0, startingTimestamp = startingTimestamp) } + /** + * Verifies that pattern subscription performs as expected. + * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' or 'tblab1'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and + * 'tblablac' after the subscription when metadata is refreshed. + * When a new topic 'tsomec' is added afterwards, it is expected that upon the next + * metadata refresh the consumer becomes subscribed to this new topic and all partitions + * of that topic are assigned to it. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testPatternSubscription(quorum: String, groupProtocol: String): Unit = { +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords, tp) + +val topic1 = "tblablac" // matches subscribed pattern +createTopic(topic1, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) + +val topic2 = "tblablak" // does not match subscribed pattern +createTopic(topic2, 2, brokerCount) +sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) + +val topic3 = "tblab1" // does not match subscribed pattern +createTopic(topic3, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) + +val consumer = createConsumer() +assertEquals(0, consumer.assignment().size) + +val pattern = Pattern.compile("t.*c") +consumer.subscribe(pattern, new TestConsumerReassignmentListener) + +var assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) +awaitAssignment(consumer, assignment) + +val topic4 = "tsomec" // matches subscribed pattern +createTopic(topic4, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) + +assignment ++= Set( + new TopicPartition(topic4, 0), + new TopicPartition(topic4, 1)) +awaitAssignment(consumer, assignment) + +consumer.unsubscribe() +assertEquals(0, consumer.assignment().size) + } + + /** + * Verifies that a second call to pattern subscription succeeds and performs as expected. + * The initial subscription is to a pattern that matches two topics 'topic' and 'foo'. + * The second subscription is to a pattern that matches 'foo' and a new topic 'bar'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and 'foo' after + * the first subscription, and to all partitions of 'foo' and 'bar' after the second. + * The metadata refresh interval is intentionally increased to a large enough value to guarantee + * that it is the subscription call that triggers a metadata refresh, and not the timeout. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubsequentPatternSubscription(quorum: String, groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "3") +val consumer = createConsumer() + +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords = numRecords, tp) + +// the first topic ('topic') matches first subscription pattern only + +val fooTopic = "foo" // matches both subscription patterns +createTopic(fooTopic, 1, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0)) + +assertEquals(0, consumer.assignment().size) + +val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this +consumer.subscribe(pattern1, new TestConsumerReassignmentListener) + +var assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(fooTopic, 0)) +awaitAssignment(consumer, assignment) + +val barTopic = "bar" // matches the next subscription pattern +createTopic(barTopic, 1, brokerCount) +sendRecords(producer, numRecords =
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2032154288 Hey @Phuc-Hong-Tran , any update on this one? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021544620 Thanks for the comments @lianetm @kirktrue, I'll try to address those asap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1540153302 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1540156766 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1540153302 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1540149023 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1540147833 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} Review Comment: @lianetm, I understand. Will fix this part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021526537 @kirktrue, regarding https://github.com/apache/kafka/pull/15585#discussion_r1539447102, there is a race condition bug where the metadata is not updated but the heartbeat request is already created, but it lacks required info, in this case the list of topic(s) that user want to subscribe to. I created the `while` loop there so that only when we received the newly updated metadata, then we will proceed to create a hearbeat request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021522732 @kirktrue, regarding comment https://github.com/apache/kafka/pull/15585#discussion_r1539447785, I decided to call `metadata.requestUpdateForNewTopics` since there is a quite delay between calling the function and actually get the metadata to receive newly updated info, and we're sending the `SubscriptionChangeEvent` right after this. Calling the function is to make sure get the required metadata before a heartbeat request is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1539955531 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: I guess the if/else was initially added to set the structure that there will be 2 paths: 1- send the broker the explicit topic names to subscribe to (this includes subscribe with topic list and with Pattern, that the client resolves to a topic list) 2- send the broker a regex, for the broker to resolve it (the new SubscriptionPattern) So, key point is that, with the concepts above, up to this PR (included), we're still only supporting path 1, so that' why I would say we better get rid of the if/else, and include it only when we do implement the 2nd path, and we properly think about the condition to use, and add a body to the else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1539955531 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: I guess the if/else was initially added to set the structure that there will be 2 paths: 1- send the broker the explicit topic names to subscribe to 2- send the broker a regex, for the broker to resolve it So, key point is that, with the concepts above, up to this PR (included), we're still only supporting path 1, so that' why I would say we better get rid of the if/else, and include it only when we do implement the 2nd path, and we properly think about the condition to use, and add a body to the else ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: I guess the if/else was initially added to set the structure that there will be 2 paths: 1- send the broker the explicit topic names to subscribe to 2- send the broker a regex, for the broker to resolve it So, key point is that, with the concepts above, up to this PR (included), we're still only supporting path 1, so that' why I would say we better get rid of the if/else, and include it only when we do implement the 2nd path, and we properly think about the condition to use, and add a body to the else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
kirktrue commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1539435022 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet Review Comment: This comment would go away, if we do end up keeping a separate `else` block. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: Thanks for catching this, @lianetm. I agree that if the logic is identical, there's no need for duplication. Any speculation as to the reason the `if` statement existed in the first place? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} Review Comment: Good call, @lianetm. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2018835239 Hey @Phuc-Hong-Tran , thanks a lot for the PR! I had a first pass and left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538180339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} Review Comment: Just for consistency, what about we encapsulate this in something like `maybeUpdateSubscriptionMetadata`? It would align nicely with the above funcs (and also that's how the similar functionality is named in the legacy coordinator so would be helpful to understand how that piece of logic translates into the new consumer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538173798 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: With this addition we end up with the exact same code repeated for the `if` and `else`, so I would say we should find a better way of doing this. First solution that comes to mind is to remove the if/else. In the end, we have a single case to handle here: send explicit subscriptions (topic names) to the broker (from the HB Mgr POV and to the broker, it does not make a diff if the topic list came from a call to subscribe with topics or a call to subscribe with Pattern that we internally resolved on the client) When we take on the next task of supporting the new regex, we'll actually have to send something different here, so we can decide then how to best differentiate the 2 cases. For now, including this PR, we only support 1 case regarding the content of what we send in the HB regarding subscription. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161659 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -433,7 +433,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: ditto ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -486,7 +486,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161334 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -374,7 +374,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: we should remove this TODO now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538158322 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466)) . What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran opened a new pull request, #15585: URL: https://github.com/apache/kafka/pull/15585 * Fully implemented subscription using Pattern for AsyncKafkaConsumer. * Enabled related tests for subscription using Pattern in PlaintextConsumerTest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org