Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-10 Thread via GitHub


cadonna merged PR #15585:
URL: https://github.com/apache/kafka/pull/15585


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045768269

   I created the follow-up task 
https://issues.apache.org/jira/browse/KAFKA-16493 to address that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


dajac commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045520630

   @cadonna Yes, we can merge it and do a follow-up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


cadonna commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045503262

   Could we merge this PR as it is and then fix the issue @dajac in a separate 
PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045483420

   Good catch @dajac , I missed that. Checking the metadata version in the 
`maybeUpdateSubscriptionMetadata` before we do the actual pattern update would 
definitely save some unneeded regex checks, just like the legacy does 
[here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L437),
 makes total sense to me. What do you think @Phuc-Hong-Tran? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


dajac commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045420088

   @lianetm Thanks. So it seems that we call it on every poll. Aren't we 
concerned about the cost of matching the regex? In the legacy code, I think 
that we have a mechanism to refresh it only if the version of the metadata has 
changed. Do we have something similar here? Do we need it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045382837

   Hey @dajac , we do have the same mechanism, calling 
`updatePatternSubscription` on a regular basis, as part of the consumer poll 
loop 
[here](https://github.com/apache/kafka/blob/4307840f100cd5dab18c7073aa2eaf060a997a92/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L723C17-L723C49).
 This ensures that is the set of topics that mathches the regex changes (topics 
creates or deleted), the subscription is updated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


dajac commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045351051

   Thanks for working on this one. I have one question regarding the 
implementation. In the legacy consumer, we have a mechanism to refresh the 
subscribed topics based on the regex. This is done to catch new topics and 
deleted topics. Do we have a similar mechanism in the new consumer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


cadonna commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2045332239

   I restarted the build since the Java 11 build crashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-09 Thread via GitHub


kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1557728693


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556146632


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556142547


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556133511


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, 
Optional subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
-if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
-data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-sentFields.subscribedTopicNames = subscribedTopicNames;
-}
-} else {
-// SubscribedTopicRegex - only sent if it has changed since 
the last heartbeat
-//  - not supported yet
+// SubscribedTopicNames - only sent if has changed since the last 
heartbeat
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;

Review Comment:
   Nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2043179367

   @cadonna could you take a look at this one when you have a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1556104853


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2042958530

   Hey @Phuc-Hong-Tran, thanks for the update, left some more comments. Almost 
there! Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555969469


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1751,16 +1753,7 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555915666


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1669,7 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+maybeUpdateSubscriptionMetadata();
 backgroundEventProcessor.process();
 
 // Keeping this updateAssignmentMetadataIfNeeded wrapping up the 
updateFetchPositions as

Review Comment:
   Since we're here, and adding logic that adds to the purpose of this 
`updateAssignmentMetadataIfNeeded`, we could clean up and remove this outdated 
comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-08 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1555909363


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -39,9 +39,8 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
* metadata refresh the consumer becomes subscribed to this new topic and 
all partitions
* of that topic are assigned to it.
*/
-  // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))

Review Comment:
   If we don't use the 
`getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly` function 
anymore we should remove its definition from this file now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1554474003


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1554466415


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1550279550


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/pull/15585#issuecomment-2021526537) that 
made you introduce the while loop:
   
   > there is a race condition bug where the metadata is not updated but the 
heartbeat request is already created, but it lacks required info 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2035194359

   Hey @Phuc-Hong-Tran, thanks for the updates! Left some comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1550178486


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466)
 it means that, with the latest metadata, we discovered new topics that made 
the subscription change. So only at that point we need to create the 
`SubscriptionChangeEvent` I would say (and yes, then we also call 
`requestUpdateForNewTopics`)
   
   So echoing my first comment on this thread, seems to me that we shouldn't 
rely on any metadata object/version check here as it could not be accurate 
(removing 
[this](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1757-L1761)),
 and we should just make sure that we send the `SubscriptionChangeEvent` only 
when we know that the subscription changed, which is inside the 
`updatePatternSubscription` 
[if](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1465).
   
   Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
   }
 
+  /**
+   * Verifies that pattern subscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' 
or 'tblab1'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When a new topic 'tsomec' is added afterwards, it is expected that upon 
the next
+   * metadata refresh the consumer becomes subscribed to this new topic and 
all partitions
+   * of that topic are assigned to it.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPatternSubscription(quorum: String, groupProtocol: String): Unit = {
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+val topic1 = "tblablac" // matches subscribed pattern
+createTopic(topic1, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+val topic2 = "tblablak" // does not match subscribed pattern
+createTopic(topic2, 2, brokerCount)
+sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
+
+val topic3 = "tblab1" // does not match subscribed pattern
+createTopic(topic3, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
+
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment().size)
+
+val pattern = Pattern.compile("t.*c")
+consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+var assignment = Set(
+  new TopicPartition(topic, 0),
+  new TopicPartition(topic, 1),
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic1, 1))
+awaitAssignment(consumer, assignment)
+
+val topic4 = "tsomec" // matches subscribed pattern
+createTopic(topic4, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
+
+assignment ++= Set(
+  new TopicPartition(topic4, 0),
+  new TopicPartition(topic4, 1))
+awaitAssignment(consumer, assignment)
+
+consumer.unsubscribe()
+assertEquals(0, consumer.assignment().size)
+  }
+
+  /**
+   * Verifies that a second call to pattern subscription succeeds and performs 
as expected.
+   * The initial subscription is to a pattern that matches two topics 'topic' 
and 'foo'.
+   * The second subscription is to a pattern that matches 'foo' and a new 
topic 'bar'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and 'foo' after
+   * the first subscription, and to all partitions of 'foo' and 'bar' after 
the second.
+   * The metadata refresh interval is intentionally increased to a large 
enough value to guarantee
+   * that it is the subscription call that triggers a metadata refresh, and 
not the timeout.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubsequentPatternSubscription(quorum: String, groupProtocol: 
String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"3")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords = numRecords, tp)
+
+// the first topic ('topic')  matches first subscription pattern only
+
+val fooTopic = "foo" // matches both subscription patterns
+createTopic(fooTopic, 1, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
+
+assertEquals(0, consumer.assignment().size)
+
+val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match 
this
+consumer.subscribe(pattern1, new TestConsumerReassignmentListener)
+
+var assignment = Set(
+  new TopicPartition(topic, 0),
+  new TopicPartition(topic, 1),
+  new TopicPartition(fooTopic, 0))
+awaitAssignment(consumer, assignment)
+
+val barTopic = "bar" // matches the next subscription pattern
+createTopic(barTopic, 1, brokerCount)
+sendRecords(producer, numRecords = 

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-02 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2032154288

   Hey @Phuc-Hong-Tran , any update on this one? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021544620

   Thanks for the comments @lianetm @kirktrue, I'll try to address those asap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1540153302


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1540156766


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1540153302


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1540149023


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1540147833


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}

Review Comment:
   @lianetm, I understand. Will fix this part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021526537

   @kirktrue, regarding 
https://github.com/apache/kafka/pull/15585#discussion_r1539447102, there is a 
race condition bug where the metadata is not updated but the heartbeat request 
is already created, but it lacks required info, in this case the list of 
topic(s) that user want to subscribe to. I created the `while` loop there so 
that only when we received the newly updated metadata, then we will proceed to 
create a hearbeat request


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


Phuc-Hong-Tran commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2021522732

   @kirktrue, regarding comment 
https://github.com/apache/kafka/pull/15585#discussion_r1539447785, I decided to 
call `metadata.requestUpdateForNewTopics` since there is a quite delay between 
calling the function and actually get the metadata to receive newly updated 
info, and we're sending the `SubscriptionChangeEvent` right after this. Calling 
the function is to make sure get the required metadata before a heartbeat 
request is created.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1539955531


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   I guess the if/else was initially added to set the structure that there will 
be 2 paths:
   
   1- send the broker the explicit topic names to subscribe to (this includes 
subscribe with topic list and with Pattern, that the client resolves to a topic 
list)
   2- send the broker a regex, for the broker to resolve it (the new 
SubscriptionPattern)
   
   So, key point is that, with the concepts above, up to this PR (included), 
we're still only supporting path 1, so that' why I would say we better get rid 
of the if/else, and include it only when we do implement the 2nd path, and we 
properly think about the condition to use, and add a body to the else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1539955531


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   I guess the if/else was initially added to set the structure that there will 
be 2 paths:
   1- send the broker the explicit topic names to subscribe to
   2- send the broker a regex, for the broker to resolve it
   So, key point is that, with the concepts above, up to this PR (included), 
we're still only supporting path 1, so that' why I would say we better get rid 
of the if/else, and include it only when we do implement the 2nd path, and we 
properly think about the condition to use, and add a body to the else



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   I guess the if/else was initially added to set the structure that there will 
be 2 paths:
   
   1- send the broker the explicit topic names to subscribe to
   2- send the broker a regex, for the broker to resolve it
   
   So, key point is that, with the concepts above, up to this PR (included), 
we're still only supporting path 1, so that' why I would say we better get rid 
of the if/else, and include it only when we do implement the 2nd path, and we 
properly think about the condition to use, and add a body to the else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-26 Thread via GitHub


kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1539435022


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet

Review Comment:
   This comment would go away, if we do end up keeping a separate `else` block.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   Thanks for catching this, @lianetm. I agree that if the logic is identical, 
there's no need for duplication. Any speculation as to the reason the `if` 
statement existed in the first place?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}

Review Comment:
   Good call, @lianetm.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2018835239

   Hey @Phuc-Hong-Tran , thanks a lot for the PR! I had a first pass and left 
some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538180339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}

Review Comment:
   Just for consistency, what about we encapsulate this in something like 
`maybeUpdateSubscriptionMetadata`? It would align nicely with the above funcs 
(and also that's how the similar functionality is named in the legacy 
coordinator so would be helpful to understand how that piece of logic 
translates into the new consumer) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538173798


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   With this addition we end up with the exact same code repeated for the `if` 
and `else`, so I would say we should find a better way of doing this. First 
solution that comes to mind is to remove the if/else. In the end, we have a 
single case to handle here: send explicit subscriptions (topic names) to the 
broker (from the HB Mgr POV and to the broker, it does not make a diff if the 
topic list came from a call to subscribe with topics or a call to subscribe 
with Pattern that we internally resolved on the client) 
   
   When we take on the next task of supporting the new regex, we'll actually 
have to send something different here, so we can decide then how to best 
differentiate the 2 cases. For now, including this PR, we only support 1 case 
regarding the content of what we send in the HB regarding subscription. Makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161659


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -433,7 +433,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -486,7 +486,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161334


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -374,7 +374,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   we should remove this TODO now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538158322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466))
 . What do you think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-22 Thread via GitHub


Phuc-Hong-Tran opened a new pull request, #15585:
URL: https://github.com/apache/kafka/pull/15585

   * Fully implemented subscription using Pattern for AsyncKafkaConsumer.
   * Enabled related tests for subscription using Pattern in 
PlaintextConsumerTest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org