wernerdv commented on code in PR #15457:
URL: https://github.com/apache/kafka/pull/15457#discussion_r1514949575


##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##########
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
     }
 
     public static class ConsumerWrapper {
-        final Optional<String> topic;
-        final OptionalInt partitionId;
-        final OptionalLong offset;
-        final Optional<String> includedTopics;
-        final Consumer<byte[], byte[]> consumer;
-        final long timeoutMs;
         final Time time = Time.SYSTEM;
+        final long timeoutMs;
+        final Consumer<byte[], byte[]> consumer;
 
         Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
-        public ConsumerWrapper(Optional<String> topic,
-                               OptionalInt partitionId,
-                               OptionalLong offset,
-                               Optional<String> includedTopics,
-                               Consumer<byte[], byte[]> consumer,
-                               long timeoutMs) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.offset = offset;
-            this.includedTopics = includedTopics;
+        public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], 
byte[]> consumer) {
+            Optional<String> topic = Optional.ofNullable(opts.topicArg());
+            Optional<String> includedTopics = 
Optional.ofNullable(opts.includedTopicsArg());
             this.consumer = consumer;
-            this.timeoutMs = timeoutMs;
-
-            if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-                seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-            } else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                // default to latest if no offset is provided
-                seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-            } else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                consumer.subscribe(Collections.singletonList(topic.get()));
-            } else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-                consumer.subscribe(Pattern.compile(includedTopics.get()));
+            timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;

Review Comment:
   Done.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##########
@@ -148,43 +143,26 @@ static boolean checkErr(PrintStream output) {
     }
 
     public static class ConsumerWrapper {
-        final Optional<String> topic;
-        final OptionalInt partitionId;
-        final OptionalLong offset;
-        final Optional<String> includedTopics;
-        final Consumer<byte[], byte[]> consumer;
-        final long timeoutMs;
         final Time time = Time.SYSTEM;
+        final long timeoutMs;
+        final Consumer<byte[], byte[]> consumer;
 
         Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
-        public ConsumerWrapper(Optional<String> topic,
-                               OptionalInt partitionId,
-                               OptionalLong offset,
-                               Optional<String> includedTopics,
-                               Consumer<byte[], byte[]> consumer,
-                               long timeoutMs) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.offset = offset;
-            this.includedTopics = includedTopics;
+        public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], 
byte[]> consumer) {
+            Optional<String> topic = Optional.ofNullable(opts.topicArg());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to