This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ba0db81e530 KAFKA-16246: Cleanups in ConsoleConsumer (#15457) ba0db81e530 is described below commit ba0db81e5307cf090dc5876f3c61ddbe5fef2284 Author: Dmitry Werner <grimekil...@gmail.com> AuthorDate: Thu Mar 7 13:39:16 2024 +0500 KAFKA-16246: Cleanups in ConsoleConsumer (#15457) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Omnia Ibrahim <o.g.h.ibra...@gmail.com> --- .../kafka/tools/consumer/ConsoleConsumer.java | 51 +++------- .../tools/consumer/ConsoleConsumerOptions.java | 31 +++--- .../tools/consumer/ConsoleConsumerOptionsTest.java | 52 +++++++--- .../kafka/tools/consumer/ConsoleConsumerTest.java | 110 +++++++++++++-------- 4 files changed, 142 insertions(+), 102 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java index f84fb88c23f..bb5ab1443ed 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java @@ -22,8 +22,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; import java.util.concurrent.CountDownLatch; import java.util.regex.Pattern; import java.util.Collections; @@ -68,11 +66,8 @@ public class ConsoleConsumer { public static void run(ConsoleConsumerOptions opts) { messageCount = 0; - long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : Long.MAX_VALUE; Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); - ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent() - ? new ConsumerWrapper(Optional.of(opts.topicArg()), opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), consumer, timeoutMs) - : new ConsumerWrapper(Optional.of(opts.topicArg()), OptionalInt.empty(), OptionalLong.empty(), Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs); + ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer); addShutdownHook(consumerWrapper, opts); @@ -148,43 +143,25 @@ public class ConsoleConsumer { } public static class ConsumerWrapper { - final Optional<String> topic; - final OptionalInt partitionId; - final OptionalLong offset; - final Optional<String> includedTopics; - final Consumer<byte[], byte[]> consumer; - final long timeoutMs; final Time time = Time.SYSTEM; + final long timeoutMs; + final Consumer<byte[], byte[]> consumer; Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator(); - public ConsumerWrapper(Optional<String> topic, - OptionalInt partitionId, - OptionalLong offset, - Optional<String> includedTopics, - Consumer<byte[], byte[]> consumer, - long timeoutMs) { - this.topic = topic; - this.partitionId = partitionId; - this.offset = offset; - this.includedTopics = includedTopics; + public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer<byte[], byte[]> consumer) { this.consumer = consumer; - this.timeoutMs = timeoutMs; - - if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) { - seek(topic.get(), partitionId.getAsInt(), offset.getAsLong()); - } else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { - // default to latest if no offset is provided - seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP); - } else if (topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) { - consumer.subscribe(Collections.singletonList(topic.get())); - } else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) { - consumer.subscribe(Pattern.compile(includedTopics.get())); + timeoutMs = opts.timeoutMs(); + Optional<String> topic = opts.topicArg(); + + if (topic.isPresent()) { + if (opts.partitionArg().isPresent()) { + seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); + } else { + consumer.subscribe(Collections.singletonList(topic.get())); + } } else { - throw new IllegalArgumentException("An invalid combination of arguments is provided. " + - "Exactly one of 'topic' or 'include' must be provided. " + - "If 'topic' is provided, an optional 'partition' may also be provided. " + - "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition."); + opts.includedTopicsArg().ifPresent(topics -> consumer.subscribe(Pattern.compile(topics))); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index a713afb2bf2..aa379195154 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -34,7 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; import java.util.Random; @@ -55,7 +55,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { private final OptionSpec<String> messageFormatterConfigOpt; private final OptionSpec<?> resetBeginningOpt; private final OptionSpec<Integer> maxMessagesOpt; - private final OptionSpec<Integer> timeoutMsOpt; + private final OptionSpec<Long> timeoutMsOpt; private final OptionSpec<?> skipMessageOnErrorOpt; private final OptionSpec<String> bootstrapServerOpt; private final OptionSpec<String> keyDeserializerOpt; @@ -66,6 +66,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { private final Properties consumerProps; private final long offset; + private final long timeoutMs; private final MessageFormatter formatter; public ConsoleConsumerOptions(String[] args) throws IOException { @@ -139,7 +140,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.") .withRequiredArg() .describedAs("timeout_ms") - .ofType(Integer.class); + .ofType(Long.class); skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + "skip it instead of halt."); bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.") @@ -184,12 +185,13 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps); consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided); offset = parseOffset(); + timeoutMs = parseTimeoutMs(); formatter = buildFormatter(); } private void checkRequiredArgs() { - List<String> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg())); - topicOrFilterArgs.removeIf(Objects::isNull); + List<Optional<String>> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg())); + topicOrFilterArgs.removeIf(arg -> !arg.isPresent()); // user need to specify value for either --topic or one of the include filters options (--include or --whitelist) if (topicOrFilterArgs.size() != 1) { CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. " + @@ -322,6 +324,11 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { "'earliest', 'latest', or a non-negative long."); } + private long parseTimeoutMs() { + long timeout = options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1; + return timeout >= 0 ? timeout : Long.MAX_VALUE; + } + private MessageFormatter buildFormatter() { MessageFormatter formatter = null; try { @@ -365,16 +372,16 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { return OptionalInt.empty(); } - String topicArg() { - return options.valueOf(topicOpt); + Optional<String> topicArg() { + return options.has(topicOpt) ? Optional.of(options.valueOf(topicOpt)) : Optional.empty(); } int maxMessages() { return options.has(maxMessagesOpt) ? options.valueOf(maxMessagesOpt) : -1; } - int timeoutMs() { - return options.has(timeoutMsOpt) ? options.valueOf(timeoutMsOpt) : -1; + long timeoutMs() { + return timeoutMs; } boolean enableSystestEventsLogging() { @@ -385,10 +392,10 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { return options.valueOf(bootstrapServerOpt); } - String includedTopicsArg() { + Optional<String> includedTopicsArg() { return options.has(includeOpt) - ? options.valueOf(includeOpt) - : options.valueOf(whitelistOpt); + ? Optional.of(options.valueOf(includeOpt)) + : Optional.ofNullable(options.valueOf(whitelistOpt)); } Properties formatterArgs() throws IOException { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java index 523122c4cdd..3242b642cdb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java @@ -48,12 +48,12 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertTrue(config.fromBeginning()); assertFalse(config.enableSystestEventsLogging()); assertFalse(config.skipMessageOnError()); assertEquals(-1, config.maxMessages()); - assertEquals(-1, config.timeoutMs()); + assertEquals(Long.MAX_VALUE, config.timeoutMs()); } @Test @@ -67,7 +67,7 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("includeTest*", config.includedTopicsArg()); + assertEquals("includeTest*", config.includedTopicsArg().orElse("")); assertTrue(config.fromBeginning()); } @@ -82,7 +82,7 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("whitelistTest*", config.includedTopicsArg()); + assertEquals("whitelistTest*", config.includedTopicsArg().orElse("")); assertTrue(config.fromBeginning()); } @@ -96,7 +96,7 @@ public class ConsoleConsumerOptionsTest { }; ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("includeTest*", config.includedTopicsArg()); + assertEquals("includeTest*", config.includedTopicsArg().orElse("")); assertTrue(config.fromBeginning()); } @@ -112,7 +112,7 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertTrue(config.partitionArg().isPresent()); assertEquals(0, config.partitionArg().getAsInt()); assertEquals(3, config.offsetArg()); @@ -191,7 +191,7 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertTrue(config.partitionArg().isPresent()); assertEquals(0, config.partitionArg().getAsInt()); assertEquals(-1, config.offsetArg()); @@ -211,7 +211,7 @@ public class ConsoleConsumerOptionsTest { Properties consumerProperties = config.consumerProps(); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertFalse(config.fromBeginning()); assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); } @@ -228,7 +228,7 @@ public class ConsoleConsumerOptionsTest { Properties consumerProperties = config.consumerProps(); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertFalse(config.fromBeginning()); assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); } @@ -246,7 +246,7 @@ public class ConsoleConsumerOptionsTest { Properties consumerProperties = config.consumerProps(); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertTrue(config.fromBeginning()); assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); } @@ -262,7 +262,7 @@ public class ConsoleConsumerOptionsTest { Properties consumerProperties = config.consumerProps(); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertFalse(config.fromBeginning()); assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); } @@ -442,7 +442,7 @@ public class ConsoleConsumerOptionsTest { ConsoleConsumerOptions config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertEquals(-2, config.offsetArg()); assertTrue(config.fromBeginning()); @@ -455,7 +455,7 @@ public class ConsoleConsumerOptionsTest { config = new ConsoleConsumerOptions(args); assertEquals("localhost:9092", config.bootstrapServer()); - assertEquals("test", config.topicArg()); + assertEquals("test", config.topicArg().orElse("")); assertEquals(-1, config.offsetArg()); assertFalse(config.fromBeginning()); } @@ -618,4 +618,30 @@ public class ConsoleConsumerOptionsTest { Exit.resetExitProcedure(); } } + + @Test + public void testParseTimeoutMs() throws Exception { + String[] withoutTimeoutMs = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0" + }; + assertEquals(Long.MAX_VALUE, new ConsoleConsumerOptions(withoutTimeoutMs).timeoutMs()); + + String[] negativeTimeoutMs = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--timeout-ms", "-100" + }; + assertEquals(Long.MAX_VALUE, new ConsoleConsumerOptions(negativeTimeoutMs).timeoutMs()); + + String[] validTimeoutMs = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--partition", "0", + "--timeout-ms", "100" + }; + assertEquals(100, new ConsoleConsumerOptions(validTimeoutMs).timeoutMs()); + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java index 008893f9c50..f4fa6ac3be2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java @@ -24,21 +24,19 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.MessageFormatter; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.util.MockTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.io.PrintStream; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -58,8 +56,7 @@ public class ConsoleConsumerTest { } @Test - public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() { - String topic = "test"; + public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() throws IOException { final Time time = new MockTime(); final int timeoutMs = 1000; @@ -71,20 +68,22 @@ public class ConsoleConsumerTest { return ConsumerRecords.EMPTY; }); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--timeout-ms", String.valueOf(timeoutMs) + }; + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( - Optional.of(topic), - OptionalInt.empty(), - OptionalLong.empty(), - Optional.empty(), - mockConsumer, - timeoutMs + new ConsoleConsumerOptions(args), + mockConsumer ); assertThrows(TimeoutException.class, consumer::receive); } @Test - public void shouldResetUnConsumedOffsetsBeforeExit() { + public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException { String topic = "test"; int maxMessages = 123; int totalMessages = 700; @@ -94,13 +93,16 @@ public class ConsoleConsumerTest { TopicPartition tp1 = new TopicPartition(topic, 0); TopicPartition tp2 = new TopicPartition(topic, 1); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", topic, + "--timeout-ms", "1000" + }; + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( - Optional.of(topic), - OptionalInt.empty(), - OptionalLong.empty(), - Optional.empty(), - mockConsumer, - 1000L); + new ConsoleConsumerOptions(args), + mockConsumer + ); mockConsumer.rebalance(Arrays.asList(tp1, tp2)); Map<TopicPartition, Long> offsets = new HashMap<>(); @@ -165,47 +167,75 @@ public class ConsoleConsumerTest { @Test @SuppressWarnings("unchecked") - public void shouldSeekWhenOffsetIsSet() { + public void shouldSeekWhenOffsetIsSet() throws IOException { Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class); TopicPartition tp0 = new TopicPartition("test", 0); + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", tp0.topic(), + "--partition", String.valueOf(tp0.partition()), + "--timeout-ms", "1000" + }; + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( - Optional.of(tp0.topic()), - OptionalInt.of(tp0.partition()), - OptionalLong.empty(), - Optional.empty(), - mockConsumer, - 1000L); + new ConsoleConsumerOptions(args), + mockConsumer + ); verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); verify(mockConsumer).seekToEnd(eq(Collections.singletonList(tp0))); consumer.cleanup(); reset(mockConsumer); - consumer = new ConsoleConsumer.ConsumerWrapper( - Optional.of(tp0.topic()), - OptionalInt.of(tp0.partition()), - OptionalLong.of(123L), - Optional.empty(), - mockConsumer, - 1000L); + args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", tp0.topic(), + "--partition", String.valueOf(tp0.partition()), + "--offset", "123", + "--timeout-ms", "1000" + }; + + consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer); verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); verify(mockConsumer).seek(eq(tp0), eq(123L)); consumer.cleanup(); reset(mockConsumer); - consumer = new ConsoleConsumer.ConsumerWrapper( - Optional.of(tp0.topic()), - OptionalInt.of(tp0.partition()), - OptionalLong.of(ListOffsetsRequest.EARLIEST_TIMESTAMP), - Optional.empty(), - mockConsumer, - 1000L); + args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", tp0.topic(), + "--partition", String.valueOf(tp0.partition()), + "--offset", "earliest", + "--timeout-ms", "1000" + }; + + consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer); verify(mockConsumer).assign(eq(Collections.singletonList(tp0))); verify(mockConsumer).seekToBeginning(eq(Collections.singletonList(tp0))); consumer.cleanup(); reset(mockConsumer); } + + @Test + @SuppressWarnings("unchecked") + public void shouldWorkWithoutTopicOption() throws IOException { + Consumer<byte[], byte[]> mockConsumer = mock(Consumer.class); + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "includeTest*", + "--from-beginning" + }; + + ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper( + new ConsoleConsumerOptions(args), + mockConsumer + ); + + verify(mockConsumer).subscribe(any(Pattern.class)); + consumer.cleanup(); + } }