FrankYang0529 commented on code in PR #16850:
URL: https://github.com/apache/kafka/pull/16850#discussion_r1720994012


##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException, 
InterruptedException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+
+        static AdminClient create(Properties props) {
+            return create(new AbstractConfig(ADMIN_CONFIG_DEF, props, false));
+        }
+
+        static AdminClient create(AbstractConfig config) {
+            String clientId = "admin-" + 
ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+            LogContext logContext = new LogContext("[LegacyAdminClient 
clientId=" + clientId + "] ");
+            Time time = Time.SYSTEM;
+            Metrics metrics = new Metrics(time);
+            Metadata metadata = new Metadata(
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS,
+                    60 * 60 * 1000L, logContext,
+                    new ClusterResourceListeners());
+            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(
+                    
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)));
+            Selector selector = new Selector(
+                    DEFAULT_CONNECTION_MAX_IDLE_MS,
+                    metrics,
+                    time,
+                    "admin",
+                    ClientUtils.createChannelBuilder(config, time, logContext),
+                    logContext);
+            NetworkClient networkClient = new NetworkClient(
+                    selector,
+                    metadata,
+                    clientId,
+                    DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+                    DEFAULT_RECONNECT_BACKOFF_MS,
+                    DEFAULT_RECONNECT_BACKOFF_MAX,
+                    DEFAULT_SEND_BUFFER_BYTES,
+                    DEFAULT_RECEIVE_BUFFER_BYTES,
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                    time,
+                    true,
+                    new ApiVersions(),
+                    logContext,
+                    MetadataRecoveryStrategy.NONE);
+            ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(
+                    logContext,
+                    networkClient,
+                    metadata,
+                    time,
+                    
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    Integer.MAX_VALUE);
+            return new AdminClient(time, highLevelClient, 
metadata.fetch().nodes());
+        }
+
+        AdminClient(Time time, ConsumerNetworkClient client, List<Node> 
bootstrapBrokers) {
+            this.time = time;
+            this.client = client;
+            this.bootstrapBrokers = bootstrapBrokers;
+        }
+
+        private AbstractResponse send(Node target, AbstractRequest.Builder<?> 
request) {
+            RequestFuture<ClientResponse> future = client.send(target, 
request);
+            while (!future.isDone()) {
+                client.poll(time.timer(DEFAULT_REQUEST_TIMEOUT_MS));
+            }
+            if (future.succeeded()) {
+                return future.value().responseBody();
+            } else {
+                throw future.exception();
+            }
+        }
+
+        private AbstractResponse sendAnyNode(AbstractRequest.Builder<?> 
request) {
+            for (Node broker : bootstrapBrokers) {
+                try {
+                    return send(broker, request);
+                } catch (AuthenticationException e) {
+                    throw e;
+                } catch (Exception e) {
+                    LOGGER.debug("Request {} failed against node {}", 
request.apiKey(), broker, e);
+                }
+            }
+            throw new RuntimeException("Request " + request.apiKey() + " 
failed on brokers " + bootstrapBrokers);
+        }
+
+        private KafkaFuture<NodeApiVersions> getNodeApiVersions(Node node) {
+            final KafkaFutureImpl<NodeApiVersions> future = new 
KafkaFutureImpl<>();
+            try {
+                ApiVersionsResponse response = (ApiVersionsResponse) 
send(node, new ApiVersionsRequest.Builder());

Review Comment:
   Hi @chia7712, original class sends request one by one. I think we can keep 
the behavior and do some refactor in the future if someone complains about 
this. Thanks.
   
   
https://github.com/apache/kafka/blob/c5e91546721b6b160732667ca7450d22927cc3b3/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala#L136-L138



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to