mumrah commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574048761



##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaConfig;
+import kafka.server.MetaProperties;
+import kafka.server.Server;
+import kafka.tools.TerseFailure;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.OptionConverters;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+
+/**
+ * The Kafka metadata tool.
+ */
+public final class MetadataShell {
+    private static final Logger log = 
LoggerFactory.getLogger(MetadataShell.class);
+
+    public static class Builder {
+        private String controllers;
+        private String configPath;
+        private File tempDir;
+        private String snapshotPath;
+
+        public Builder setControllers(String controllers) {
+            this.controllers = controllers;
+            return this;
+        }
+
+        public Builder setConfigPath(String configPath) {
+            this.configPath = configPath;
+            return this;
+        }
+
+        public Builder setSnapshotPath(String snapshotPath) {
+            this.snapshotPath = snapshotPath;
+            return this;
+        }
+
+        public Builder setTempDir(File tempDir) {
+            this.tempDir = tempDir;
+            return this;
+        }
+
+        public MetadataShell build() throws Exception {
+            if (snapshotPath != null) {
+                if (controllers != null) {
+                    throw new RuntimeException("If you specify a snapshot 
path, you " +

Review comment:
       Do we eventually want to support snapshots + connected mode? Seems like 
in cases where we have a lot of records in the metadata log, this tool will 
take a while to initialize. 

##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+    private static final Logger log = 
LoggerFactory.getLogger(MetadataNodeManager.class);
+
+    public static class Data {
+        private final DirectoryNode root = new DirectoryNode();
+        private String workingDirectory = "/";
+
+        public DirectoryNode root() {
+            return root;
+        }
+
+        public String workingDirectory() {
+            return workingDirectory;
+        }
+
+        public void setWorkingDirectory(String workingDirectory) {
+            this.workingDirectory = workingDirectory;
+        }
+    }
+
+    class LogListener implements MetaLogListener, 
RaftClient.Listener<ApiMessageAndVersion> {
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: handle lastOffset
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = 
reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : 
batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
+            appendEvent("handleCommits", () -> {
+                log.error("handleCommits " + messages + " at offset " + 
lastOffset);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("offset").setContents(String.valueOf(lastOffset));
+                for (ApiMessage message : messages) {
+                    handleMessage(message);
+                }
+            }, null);
+        }
+
+        @Override
+        public void handleNewLeader(MetaLogLeader leader) {
+            appendEvent("handleNewLeader", () -> {
+                log.error("handleNewLeader " + leader);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("leader").setContents(leader.toString());
+            }, null);
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + 
")");
+        }
+
+        @Override
+        public void handleRenounce(long epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + 
")");
+        }
+
+        @Override
+        public void handleResign() {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleResign()");
+        }
+
+        @Override
+        public void beginShutdown() {
+            log.debug("MetaLogListener sent beginShutdown");
+        }
+    }
+
+    private final Data data = new Data();
+    private final LogListener logListener = new LogListener();
+    private final ObjectMapper objectMapper;
+    private final KafkaEventQueue queue;
+
+    public MetadataNodeManager() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.registerModule(new Jdk8Module());

Review comment:
       Out of curiosity what is this module used for?

##########
File path: shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The Kafka metadata tool.

Review comment:
       Is this javadoc accurate?

##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {

Review comment:
       Is "shell" the right name for this? I think "metadata shell" or 
"metashell" (like the branch name) is more descriptive of what it does. A Kafka 
"shell" makes me think of a tool that can do any of the normal command line 
things to a Kafka cluster (create topics, modify configs, etc). 
   
   Do we ever intend to make this utility more than a read-only metadata 
explorer?
   
   Maybe we can call it something like "kafka-metadata.sh" or 
"kafka-metadata-shell.sh"? I'm curious what others think here as well. 

##########
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##########
@@ -34,6 +34,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the 
partition leader." },

Review comment:
       Should this record change be part of this PR?

##########
File path: shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the cat command.
+ */
+public final class CatCommandHandler implements Commands.Handler {
+    private static final Logger log = 
LoggerFactory.getLogger(CatCommandHandler.class);
+
+    public final static Commands.Type TYPE = new CatCommandType();
+
+    public static class CatCommandType implements Commands.Type {
+        private CatCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "cat";
+        }
+
+        @Override
+        public String description() {
+            return "Show the contents of metadata nodes.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("targets").
+                nargs("+").
+                help("The metadata nodes to display.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new CatCommandHandler(namespace.getList("targets"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> 
nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, 
nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> targets;
+
+    public CatCommandHandler(List<String> targets) {
+        this.targets = targets;

Review comment:
       nit: should copy into the private list rather than assign

##########
File path: metadata/src/main/resources/common/metadata/IsrChangeRecord.json
##########
@@ -28,6 +28,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the 
partition leader." },
+    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",

Review comment:
       Should this record change be part of this PR?

##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
       Is this used anywhere?

##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {
+  archivesBaseName = "kafka-shell"
+
+  dependencies {
+    compile libs.argparse4j
+    compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
+    compile libs.jline
+    compile libs.slf4jApi
+    compile project(':clients')
+    compile project(':core')
+    compile project(':log4j-appender')
+    compile project(':metadata')
+    compile project(':raft')
+
+    compile libs.jacksonJaxrsJsonProvider
+
+    testCompile project(':clients')
+    testCompile libs.junitJupiter
+    testCompile project(':clients').sourceSets.test.output

Review comment:
       Do we actually need this here? This would only be needed if there was a 
class in clients/src/test that we depend on. Doing this "output" kind of 
dependency in gradle can slow down the build, so we should avoid if possible




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to