jlprat commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1478135674


##########
tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.record.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+class LoggingMessageFormatter implements MessageFormatter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingMessageFormatter.class);
+    private final DefaultMessageFormatter defaultWriter = new 
DefaultMessageFormatter();
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        defaultWriter.configure(configs);
+    }
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, 
PrintStream output) {
+        defaultWriter.writeTo(consumerRecord, output);
+        String timestamp = consumerRecord.timestampType() != 
TimestampType.NO_TIMESTAMP_TYPE
+                ? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + " "

Review Comment:
   Trailing comma in case timestamps are present is missing
   ```suggestion
                   ? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + ", "
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java:
##########
@@ -1139,8 +1140,7 @@ private <K, V> String 
readWindowedKeyedMessagesViaConsoleConsumer(final Deserial
                 "--property", "key.deserializer.window.size.ms=500",
             };
 
-            ConsoleConsumer.messageCount_$eq(0); //reset the message count

Review Comment:
   Why this is not needed anymore with the new code?



##########
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java:
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.tools.ToolsTestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleConsumerOptionsTest {
+
+    @Test
+    public void shouldParseValidConsumerValidConfig() throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--from-beginning"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg());
+        assertTrue(config.fromBeginning());
+        assertFalse(config.enableSystestEventsLogging());
+        assertFalse(config.skipMessageOnError());
+        assertEquals(-1, config.maxMessages());
+        assertEquals(-1, config.timeoutMs());

Review Comment:
   These were not present in the Scala code, but I guess it's fine to add them.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Consumer that dumps messages to standard out.
+ */
+public class ConsoleConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleConsumer.class);
+    private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
+
+    static int messageCount = 0;
+
+    public static void main(String[] args) throws Exception {
+        ConsoleConsumerOptions opts = new ConsoleConsumerOptions(args);
+        try {
+            run(opts);
+        } catch (AuthenticationException ae) {
+            LOG.error("Authentication failed: terminating consumer process", 
ae);
+            Exit.exit(1);
+        } catch (Throwable t) {
+            LOG.error("Unknown error when running consumer: ", t);
+            Exit.exit(1);
+        }
+    }
+
+    public static void run(ConsoleConsumerOptions opts) {
+        messageCount = 0;
+        long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
+        Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+        ConsumerWrapper consumerWrapper = opts.partitionArg().isPresent()
+            ? new ConsumerWrapper(Optional.of(opts.topicArg()), 
opts.partitionArg(), OptionalLong.of(opts.offsetArg()), Optional.empty(), 
consumer, timeoutMs)
+            : new ConsumerWrapper(Optional.of(opts.topicArg()), 
OptionalInt.empty(), OptionalLong.empty(), 
Optional.ofNullable(opts.includedTopicsArg()), consumer, timeoutMs);
+
+        addShutdownHook(consumerWrapper, opts);
+
+        try {
+            process(opts.maxMessages(), opts.formatter(), consumerWrapper, 
System.out, opts.skipMessageOnError());
+        } finally {
+            consumerWrapper.cleanup();
+            opts.formatter().close();
+            reportRecordCount();
+
+            SHUTDOWN_LATCH.countDown();
+        }
+    }
+
+    static void addShutdownHook(ConsumerWrapper consumer, 
ConsoleConsumerOptions conf) {
+        Exit.addShutdownHook("consumer-shutdown-hook", () -> {
+            try {
+                consumer.wakeup();
+                SHUTDOWN_LATCH.await();
+            } catch (Throwable t) {
+                LOG.error("Exception while running shutdown hook " + 
t.getMessage());

Review Comment:
   Line 64, logs the whole exception, while here we have only the message. I 
guess it would be good to log the exception as well:
   ```suggestion
                   LOG.error("Exception while running shutdown hook ", t);
   ```



##########
tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.record.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+class LoggingMessageFormatter implements MessageFormatter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingMessageFormatter.class);
+    private final DefaultMessageFormatter defaultWriter = new 
DefaultMessageFormatter();
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        defaultWriter.configure(configs);
+    }
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, 
PrintStream output) {
+        defaultWriter.writeTo(consumerRecord, output);
+        String timestamp = consumerRecord.timestampType() != 
TimestampType.NO_TIMESTAMP_TYPE
+                ? consumerRecord.timestampType() + ":" + 
consumerRecord.timestamp() + " "
+                : "";
+        String key = consumerRecord.key() == null ? "null" : new 
String(consumerRecord.key(), StandardCharsets.UTF_8);
+        String value = consumerRecord.value() == null ? "null" : new 
String(consumerRecord.value(), StandardCharsets.UTF_8);

Review Comment:
   These 2 lines are not equivalent to the Scala code. Both `key` and `value` 
Strings are missing "key:" and "value:" respectively. Also trailing comma on 
`key` is missing.
   ```suggestion
           String key = "key:" + consumerRecord.key() == null ? "null" : new 
String(consumerRecord.key(), StandardCharsets.UTF_8) + ",";
           String value = "value:" + consumerRecord.value() == null ? "null" : 
new String(consumerRecord.value(), StandardCharsets.UTF_8);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to