cadonna commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1472581740
########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} Review Comment: This method needs javadocs describing what this method does. Please also add an empty line before the method. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1724,6 +1738,21 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen } } + private void subscribeInternal(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) { + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.pattern().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); Review Comment: This will probably not work since you did not add a `toString()` method to `SubscriptionPattern`. You should either add `toString()` to `SubscriptionPattern` or change this line to ```suggestion log.info("Subscribed to pattern: '{}'", pattern.pattern()); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java: ########## @@ -72,6 +72,10 @@ public interface Consumer<K, V> extends Closeable { */ void subscribe(Pattern pattern); + void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); Review Comment: This method needs a link to the javadocs in `KafkaConsumer`: ```suggestion /** * @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener) */ void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } + @Override + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} + @Override + public void subscribe(SubscriptionPattern pattern) {} Review Comment: This method needs javadocs describing what this method does. Please also add an empty line before the method. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ########## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; + /* we should rename this to something more specific */ + private SubscriptionPattern subscriptionPattern; Review Comment: Could you please also reset this field in `unsubscribe()` and add it to the return value of `toString()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public class SubscriptionPattern { Review Comment: > This class would need some doc explaining what it represents. I definitely agree with @lianetm on this. > Also not sure if this is the right place for it (given that this whole intention with the new regex is driven by the broker, but is not implemented yet). So at this point is not clear to me if we would prefer to define this on the broker side to be used there? The KIP says this class should be added to the public API of the consumer. I agree with @Phuc-Hong-Tran that this class is merely there to differentiate patterns that are Google RE2/J from `java.util.regex.Pattern` as stated in the KIP. ########## clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java: ########## @@ -72,6 +72,10 @@ public interface Consumer<K, V> extends Closeable { */ void subscribe(Pattern pattern); + void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback); + + void subscribe(SubscriptionPattern pattern); Review Comment: This method needs a link to the javadocs in `KafkaConsumer`: ```suggestion /** * @see KafkaConsumer#subscribe(SubscriptionPattern) */ void subscribe(SubscriptionPattern pattern); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org