This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba0db81e530 KAFKA-16246: Cleanups in ConsoleConsumer (#15457)
ba0db81e530 is described below

commit ba0db81e5307cf090dc5876f3c61ddbe5fef2284
Author: Dmitry Werner <grimekil...@gmail.com>
AuthorDate: Thu Mar 7 13:39:16 2024 +0500

    KAFKA-16246: Cleanups in ConsoleConsumer (#15457)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Omnia Ibrahim 
<o.g.h.ibra...@gmail.com>
---
 .../kafka/tools/consumer/ConsoleConsumer.java      |  51 +++-------
 .../tools/consumer/ConsoleConsumerOptions.java     |  31 +++---
 .../tools/consumer/ConsoleConsumerOptionsTest.java |  52 +++++++---
 .../kafka/tools/consumer/ConsoleConsumerTest.java  | 110 +++++++++++++--------
 4 files changed, 142 insertions(+), 102 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index f84fb88c23f..bb5ab1443ed 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
 import java.util.concurrent.CountDownLatch;
 import java.util.regex.Pattern;
 import java.util.Collections;
@@ -68,11 +66,8 @@ public class ConsoleConsumer {
 
     public static void run(ConsoleConsumerOptions opts) {
         messageCount = 0;
-        long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
         Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
-        ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent()
-            ? new ConsumerWrapper(Optional.of(opts.topicArg()), 
opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
-            : new ConsumerWrapper(Optional.of(opts.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs);
+        ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer);
 
         addShutdownHook(consumerWrapper, opts);
 
@@ -148,43 +143,25 @@ public class ConsoleConsumer {
     }
 
     public static class ConsumerWrapper {
-        final Optional<String> topic;
-        final OptionalInt partitionId;
-        final OptionalLong offset;
-        final Optional<String> includedTopics;
-        final Consumer<byte[], byte[]> consumer;
-        final long timeoutMs;
         final Time time = Time.SYSTEM;
+        final long timeoutMs;
+        final Consumer<byte[], byte[]> consumer;
 
         Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
-        public ConsumerWrapper(Optional<String> topic,
-                               OptionalInt partitionId,
-                               OptionalLong offset,
-                               Optional<String> includedTopics,
-                               Consumer<byte[], byte[]> consumer,
-                               long timeoutMs) {
-            this.topic = topic;
-            this.partitionId = partitionId;
-            this.offset = offset;
-            this.includedTopics = includedTopics;
+        public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], 
byte[]> consumer) {
             this.consumer = consumer;
-            this.timeoutMs = timeoutMs;
-
-            if (topic.isPresent() && partitionId.isPresent() && 
offset.isPresent() && !includedTopics.isPresent()) {
-                seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
-            } else if (topic.isPresent() && partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                // default to latest if no offset is provided
-                seek(topic.get(), partitionId.getAsInt(), 
ListOffsetsRequest.LATEST_TIMESTAMP);
-            } else if (topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && !includedTopics.isPresent()) {
-                consumer.subscribe(Collections.singletonList(topic.get()));
-            } else if (!topic.isPresent() && !partitionId.isPresent() && 
!offset.isPresent() && includedTopics.isPresent()) {
-                consumer.subscribe(Pattern.compile(includedTopics.get()));
+            timeoutMs = opts.timeoutMs();
+            Optional<String> topic = opts.topicArg();
+
+            if (topic.isPresent()) {
+                if (opts.partitionArg().isPresent()) {
+                    seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
+                } else {
+                    consumer.subscribe(Collections.singletonList(topic.get()));
+                }
             } else {
-                throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +
-                        "Exactly one of 'topic' or 'include' must be provided. 
" +
-                        "If 'topic' is provided, an optional 'partition' may 
also be provided. " +
-                        "If 'partition' is provided, an optional 'offset' may 
also be provided, otherwise, consumption starts from the end of the 
partition.");
+                opts.includedTopicsArg().ifPresent(topics -> 
consumer.subscribe(Pattern.compile(topics)));
             }
         }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index a713afb2bf2..aa379195154 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -34,7 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.Random;
@@ -55,7 +55,7 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
     private final OptionSpec<String> messageFormatterConfigOpt;
     private final OptionSpec<?> resetBeginningOpt;
     private final OptionSpec<Integer> maxMessagesOpt;
-    private final OptionSpec<Integer> timeoutMsOpt;
+    private final OptionSpec<Long> timeoutMsOpt;
     private final OptionSpec<?> skipMessageOnErrorOpt;
     private final OptionSpec<String> bootstrapServerOpt;
     private final OptionSpec<String> keyDeserializerOpt;
@@ -66,6 +66,7 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
 
     private final Properties consumerProps;
     private final long offset;
+    private final long timeoutMs;
     private final MessageFormatter formatter;
 
     public ConsoleConsumerOptions(String[] args) throws IOException {
@@ -139,7 +140,7 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
         timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no 
message is available for consumption for the specified interval.")
                 .withRequiredArg()
                 .describedAs("timeout_ms")
-                .ofType(Integer.class);
+                .ofType(Long.class);
         skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If 
there is an error when processing a message, " +
                 "skip it instead of halt.");
         bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The 
server(s) to connect to.")
@@ -184,12 +185,13 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
         Set<String> groupIdsProvided = 
checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
         consumerProps = buildConsumerProps(consumerPropsFromFile, 
extraConsumerProps, groupIdsProvided);
         offset = parseOffset();
+        timeoutMs = parseTimeoutMs();
         formatter = buildFormatter();
     }
 
     private void checkRequiredArgs() {
-        List<String> topicOrFilterArgs = new 
ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
-        topicOrFilterArgs.removeIf(Objects::isNull);
+        List<Optional<String>> topicOrFilterArgs = new 
ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
+        topicOrFilterArgs.removeIf(arg -> !arg.isPresent());
         // user need to specify value for either --topic or one of the include 
filters options (--include or --whitelist)
         if (topicOrFilterArgs.size() != 1) {
             CommandLineUtils.printUsageAndExit(parser, "Exactly one of 
--include/--topic is required. " +
@@ -322,6 +324,11 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
                 "'earliest', 'latest', or a non-negative long.");
     }
 
+    private long parseTimeoutMs() {
+        long timeout = options.has(timeoutMsOpt) ? 
options.valueOf(timeoutMsOpt) : -1;
+        return timeout >= 0 ? timeout : Long.MAX_VALUE;
+    }
+
     private MessageFormatter buildFormatter() {
         MessageFormatter formatter = null;
         try {
@@ -365,16 +372,16 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
         return OptionalInt.empty();
     }
 
-    String topicArg() {
-        return options.valueOf(topicOpt);
+    Optional<String> topicArg() {
+        return options.has(topicOpt) ? Optional.of(options.valueOf(topicOpt)) 
: Optional.empty();
     }
 
     int maxMessages() {
         return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : 
-1;
     }
 
-    int timeoutMs() {
-        return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1;
+    long timeoutMs() {
+        return timeoutMs;
     }
 
     boolean enableSystestEventsLogging() {
@@ -385,10 +392,10 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
         return options.valueOf(bootstrapServerOpt);
     }
 
-    String includedTopicsArg() {
+    Optional<String> includedTopicsArg() {
         return options.has(includeOpt)
-                ? options.valueOf(includeOpt)
-                : options.valueOf(whitelistOpt);
+                ? Optional.of(options.valueOf(includeOpt))
+                : Optional.ofNullable(options.valueOf(whitelistOpt));
     }
 
     Properties formatterArgs() throws IOException {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
index 523122c4cdd..3242b642cdb 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
@@ -48,12 +48,12 @@ public class ConsoleConsumerOptionsTest {
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertTrue(config.fromBeginning());
         assertFalse(config.enableSystestEventsLogging());
         assertFalse(config.skipMessageOnError());
         assertEquals(-1, config.maxMessages());
-        assertEquals(-1, config.timeoutMs());
+        assertEquals(Long.MAX_VALUE, config.timeoutMs());
     }
 
     @Test
@@ -67,7 +67,7 @@ public class ConsoleConsumerOptionsTest {
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("includeTest*", config.includedTopicsArg());
+        assertEquals("includeTest*", config.includedTopicsArg().orElse(""));
         assertTrue(config.fromBeginning());
     }
 
@@ -82,7 +82,7 @@ public class ConsoleConsumerOptionsTest {
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("whitelistTest*", config.includedTopicsArg());
+        assertEquals("whitelistTest*", config.includedTopicsArg().orElse(""));
         assertTrue(config.fromBeginning());
     }
 
@@ -96,7 +96,7 @@ public class ConsoleConsumerOptionsTest {
         };
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("includeTest*", config.includedTopicsArg());
+        assertEquals("includeTest*", config.includedTopicsArg().orElse(""));
         assertTrue(config.fromBeginning());
     }
 
@@ -112,7 +112,7 @@ public class ConsoleConsumerOptionsTest {
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertTrue(config.partitionArg().isPresent());
         assertEquals(0, config.partitionArg().getAsInt());
         assertEquals(3, config.offsetArg());
@@ -191,7 +191,7 @@ public class ConsoleConsumerOptionsTest {
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertTrue(config.partitionArg().isPresent());
         assertEquals(0, config.partitionArg().getAsInt());
         assertEquals(-1, config.offsetArg());
@@ -211,7 +211,7 @@ public class ConsoleConsumerOptionsTest {
         Properties consumerProperties = config.consumerProps();
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertFalse(config.fromBeginning());
         assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
     }
@@ -228,7 +228,7 @@ public class ConsoleConsumerOptionsTest {
         Properties consumerProperties = config.consumerProps();
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertFalse(config.fromBeginning());
         assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
     }
@@ -246,7 +246,7 @@ public class ConsoleConsumerOptionsTest {
         Properties consumerProperties = config.consumerProps();
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertTrue(config.fromBeginning());
         assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
     }
@@ -262,7 +262,7 @@ public class ConsoleConsumerOptionsTest {
         Properties consumerProperties = config.consumerProps();
 
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertFalse(config.fromBeginning());
         assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
     }
@@ -442,7 +442,7 @@ public class ConsoleConsumerOptionsTest {
 
         ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertEquals(-2, config.offsetArg());
         assertTrue(config.fromBeginning());
 
@@ -455,7 +455,7 @@ public class ConsoleConsumerOptionsTest {
 
         config = new ConsoleConsumerOptions(args);
         assertEquals("localhost:9092", config.bootstrapServer());
-        assertEquals("test", config.topicArg());
+        assertEquals("test", config.topicArg().orElse(""));
         assertEquals(-1, config.offsetArg());
         assertFalse(config.fromBeginning());
     }
@@ -618,4 +618,30 @@ public class ConsoleConsumerOptionsTest {
             Exit.resetExitProcedure();
         }
     }
+
+    @Test
+    public void testParseTimeoutMs() throws Exception {
+        String[] withoutTimeoutMs = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--partition", "0"
+        };
+        assertEquals(Long.MAX_VALUE, new 
ConsoleConsumerOptions(withoutTimeoutMs).timeoutMs());
+
+        String[] negativeTimeoutMs = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--partition", "0",
+            "--timeout-ms", "-100"
+        };
+        assertEquals(Long.MAX_VALUE, new 
ConsoleConsumerOptions(negativeTimeoutMs).timeoutMs());
+
+        String[] validTimeoutMs = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--partition", "0",
+            "--timeout-ms", "100"
+        };
+        assertEquals(100, new 
ConsoleConsumerOptions(validTimeoutMs).timeoutMs());
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
index 008893f9c50..f4fa6ac3be2 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -24,21 +24,19 @@ import 
org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.MessageFormatter;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.util.MockTime;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.io.PrintStream;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
+import java.util.regex.Pattern;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -58,8 +56,7 @@ public class ConsoleConsumerTest {
     }
 
     @Test
-    public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() {
-        String topic = "test";
+    public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() throws 
IOException {
         final Time time = new MockTime();
         final int timeoutMs = 1000;
 
@@ -71,20 +68,22 @@ public class ConsoleConsumerTest {
             return ConsumerRecords.EMPTY;
         });
 
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--timeout-ms", String.valueOf(timeoutMs)
+        };
+
         ConsoleConsumer.ConsumerWrapper consumer = new 
ConsoleConsumer.ConsumerWrapper(
-                Optional.of(topic),
-                OptionalInt.empty(),
-                OptionalLong.empty(),
-                Optional.empty(),
-                mockConsumer,
-                timeoutMs
+            new ConsoleConsumerOptions(args),
+            mockConsumer
         );
 
         assertThrows(TimeoutException.class, consumer::receive);
     }
 
     @Test
-    public void shouldResetUnConsumedOffsetsBeforeExit() {
+    public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException {
         String topic = "test";
         int maxMessages = 123;
         int totalMessages = 700;
@@ -94,13 +93,16 @@ public class ConsoleConsumerTest {
         TopicPartition tp1 = new TopicPartition(topic, 0);
         TopicPartition tp2 = new TopicPartition(topic, 1);
 
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", topic,
+            "--timeout-ms", "1000"
+        };
+
         ConsoleConsumer.ConsumerWrapper consumer = new 
ConsoleConsumer.ConsumerWrapper(
-                Optional.of(topic),
-                OptionalInt.empty(),
-                OptionalLong.empty(),
-                Optional.empty(),
-                mockConsumer,
-                1000L);
+            new ConsoleConsumerOptions(args),
+            mockConsumer
+        );
 
         mockConsumer.rebalance(Arrays.asList(tp1, tp2));
         Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -165,47 +167,75 @@ public class ConsoleConsumerTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void shouldSeekWhenOffsetIsSet() {
+    public void shouldSeekWhenOffsetIsSet() throws IOException {
         Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
         TopicPartition tp0 = new TopicPartition("test", 0);
 
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", tp0.topic(),
+            "--partition", String.valueOf(tp0.partition()),
+            "--timeout-ms", "1000"
+        };
+
         ConsoleConsumer.ConsumerWrapper consumer = new 
ConsoleConsumer.ConsumerWrapper(
-                Optional.of(tp0.topic()),
-                OptionalInt.of(tp0.partition()),
-                OptionalLong.empty(),
-                Optional.empty(),
-                mockConsumer,
-                1000L);
+            new ConsoleConsumerOptions(args),
+            mockConsumer
+        );
 
         verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
         verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0)));
         consumer.cleanup();
         reset(mockConsumer);
 
-        consumer = new ConsoleConsumer.ConsumerWrapper(
-                Optional.of(tp0.topic()),
-                OptionalInt.of(tp0.partition()),
-                OptionalLong.of(123L),
-                Optional.empty(),
-                mockConsumer,
-                1000L);
+        args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", tp0.topic(),
+            "--partition", String.valueOf(tp0.partition()),
+            "--offset", "123",
+            "--timeout-ms", "1000"
+        };
+
+        consumer = new ConsoleConsumer.ConsumerWrapper(new 
ConsoleConsumerOptions(args), mockConsumer);
 
         verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
         verify(mockConsumer).seek(eq(tp0), eq(123L));
         consumer.cleanup();
         reset(mockConsumer);
 
-        consumer = new ConsoleConsumer.ConsumerWrapper(
-                Optional.of(tp0.topic()),
-                OptionalInt.of(tp0.partition()),
-                OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP),
-                Optional.empty(),
-                mockConsumer,
-                1000L);
+        args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", tp0.topic(),
+            "--partition", String.valueOf(tp0.partition()),
+            "--offset", "earliest",
+            "--timeout-ms", "1000"
+        };
+
+        consumer = new ConsoleConsumer.ConsumerWrapper(new 
ConsoleConsumerOptions(args), mockConsumer);
 
         verify(mockConsumer).assign(eq(Collections.singletonList(tp0)));
         
verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0)));
         consumer.cleanup();
         reset(mockConsumer);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldWorkWithoutTopicOption() throws IOException {
+        Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--include", "includeTest*",
+            "--from-beginning"
+        };
+
+        ConsoleConsumer.ConsumerWrapper consumer = new 
ConsoleConsumer.ConsumerWrapper(
+            new ConsoleConsumerOptions(args),
+            mockConsumer
+        );
+
+        verify(mockConsumer).subscribe(any(Pattern.class));
+        consumer.cleanup();
+    }
 }

Reply via email to