chia7712 commented on code in PR #16850:
URL: https://github.com/apache/kafka/pull/16850#discussion_r1713090302


##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private volatile boolean running = true;
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+        private final ConcurrentLinkedQueue<RequestFuture<ClientResponse>> 
pendingFutures = new ConcurrentLinkedQueue<>();
+
+        static class AdminConfig extends AbstractConfig {

Review Comment:
   Could you please remove it as we can use `AbstractConfig` directly? for 
instance:
   ```java
           static AdminClient create(Map<String, ?> props) {
               return create(new AbstractConfig(ADMIN_CONFIG_DEF, props, 
false));
           }
   ```



##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private volatile boolean running = true;
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+        private final ConcurrentLinkedQueue<RequestFuture<ClientResponse>> 
pendingFutures = new ConcurrentLinkedQueue<>();
+
+        static class AdminConfig extends AbstractConfig {
+            AdminConfig(Map<?, ?> originals) {
+                super(ADMIN_CONFIG_DEF, originals, false);
+            }
+        }
+
+        static AdminClient create(Properties props) {
+            return create(Utils.propsToMap(props));
+        }
+
+        static AdminClient create(Map<String, ?> props) {
+            return create(new AdminConfig(props));
+        }
+
+        static AdminClient create(AdminConfig config) {
+            String clientId = "admin-" + 
ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+            LogContext logContext = new LogContext("[LegacyAdminClient 
clientId=" + clientId + "] ");
+            Time time = Time.SYSTEM;
+            Metrics metrics = new Metrics(time);
+            Metadata metadata = new Metadata(
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS,
+                    60 * 60 * 1000L, logContext,
+                    new ClusterResourceListeners());
+            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(
+                    
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)));
+            Selector selector = new Selector(
+                    DEFAULT_CONNECTION_MAX_IDLE_MS,
+                    metrics,
+                    time,
+                    "admin",
+                    ClientUtils.createChannelBuilder(config, time, logContext),
+                    logContext);
+            NetworkClient networkClient = new NetworkClient(
+                    selector,
+                    metadata,
+                    clientId,
+                    DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+                    DEFAULT_RECONNECT_BACKOFF_MS,
+                    DEFAULT_RECONNECT_BACKOFF_MAX,
+                    DEFAULT_SEND_BUFFER_BYTES,
+                    DEFAULT_RECEIVE_BUFFER_BYTES,
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                    time,
+                    true,
+                    new ApiVersions(),
+                    logContext,
+                    MetadataRecoveryStrategy.NONE);
+            ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(
+                    logContext,
+                    networkClient,
+                    metadata,
+                    time,
+                    
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    Integer.MAX_VALUE);
+            return new AdminClient(time, highLevelClient, 
metadata.fetch().nodes());
+        }
+
+        AdminClient(Time time, ConsumerNetworkClient client, List<Node> 
bootstrapBrokers) {
+            this.time = time;
+            this.client = client;
+            this.bootstrapBrokers = bootstrapBrokers;
+            startNetworkThread();
+        }
+
+        private void startNetworkThread() {
+            Thread networkThread = new Thread(() -> {
+                try {
+                    while (running) {
+                        client.poll(time.timer(Long.MAX_VALUE));
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("admin-client-network-thread exited", t);
+                } finally {
+                    pendingFutures.forEach(future -> {
+                        try {
+                            future.raise(Errors.UNKNOWN_SERVER_ERROR);
+                        } catch (IllegalStateException ignored) {
+                        }
+                    });
+                    pendingFutures.clear();
+                }
+            }, "admin-client-network-thread");
+            networkThread.setDaemon(true);
+            networkThread.start();
+        }
+
+        private AbstractResponse send(Node target, AbstractRequest.Builder<?> 
request) throws InterruptedException {
+            RequestFuture<ClientResponse> future = client.send(target, 
request);
+            pendingFutures.add(future);
+            future.awaitDone(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            pendingFutures.remove(future);
+            if (future.succeeded()) {
+                return future.value().responseBody();
+            } else {
+                throw future.exception();
+            }
+        }
+
+        private AbstractResponse sendAnyNode(AbstractRequest.Builder<?> 
request) {
+            for (Node broker : bootstrapBrokers) {
+                try {
+                    return send(broker, request);
+                } catch (AuthenticationException e) {
+                    throw e;
+                } catch (Exception e) {
+                    LOGGER.debug("Request {} failed against node {}", 
request.apiKey(), broker, e);
+                }
+            }
+            throw new RuntimeException("Request " + request.apiKey() + " 
failed on brokers " + bootstrapBrokers);
+        }
+
+        private KafkaFuture<NodeApiVersions> getNodeApiVersions(Node node) {
+            final KafkaFutureImpl<NodeApiVersions> future = new 
KafkaFutureImpl<>();
+            try {
+                ApiVersionsResponse response = (ApiVersionsResponse) 
send(node, new ApiVersionsRequest.Builder());
+                Errors error = Errors.forCode(response.data().errorCode());
+                if (error.exception() != null) {
+                    future.completeExceptionally(error.exception());
+                } else {
+                    future.complete(new 
NodeApiVersions(response.data().apiKeys(), response.data().supportedFeatures(), 
response.data().zkMigrationReady()));
+                }
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+
+            return future;
+        }
+
+        public void awaitBrokers() {
+            List<Node> nodes;
+            do {
+                nodes = findAllBrokers();
+                if (nodes.isEmpty()) {
+                    try {
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {

Review Comment:
   not sure why we need to catch it here. could you please throw it directly as 
it is a command-line tool



##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private volatile boolean running = true;
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+        private final ConcurrentLinkedQueue<RequestFuture<ClientResponse>> 
pendingFutures = new ConcurrentLinkedQueue<>();
+
+        static class AdminConfig extends AbstractConfig {
+            AdminConfig(Map<?, ?> originals) {
+                super(ADMIN_CONFIG_DEF, originals, false);
+            }
+        }
+
+        static AdminClient create(Properties props) {
+            return create(Utils.propsToMap(props));
+        }
+
+        static AdminClient create(Map<String, ?> props) {

Review Comment:
   Do we need those constructors?



##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private volatile boolean running = true;
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+        private final ConcurrentLinkedQueue<RequestFuture<ClientResponse>> 
pendingFutures = new ConcurrentLinkedQueue<>();
+
+        static class AdminConfig extends AbstractConfig {
+            AdminConfig(Map<?, ?> originals) {
+                super(ADMIN_CONFIG_DEF, originals, false);
+            }
+        }
+
+        static AdminClient create(Properties props) {
+            return create(Utils.propsToMap(props));
+        }
+
+        static AdminClient create(Map<String, ?> props) {
+            return create(new AdminConfig(props));
+        }
+
+        static AdminClient create(AdminConfig config) {
+            String clientId = "admin-" + 
ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+            LogContext logContext = new LogContext("[LegacyAdminClient 
clientId=" + clientId + "] ");
+            Time time = Time.SYSTEM;
+            Metrics metrics = new Metrics(time);
+            Metadata metadata = new Metadata(
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS,
+                    60 * 60 * 1000L, logContext,
+                    new ClusterResourceListeners());
+            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(
+                    
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)));
+            Selector selector = new Selector(
+                    DEFAULT_CONNECTION_MAX_IDLE_MS,
+                    metrics,
+                    time,
+                    "admin",
+                    ClientUtils.createChannelBuilder(config, time, logContext),
+                    logContext);
+            NetworkClient networkClient = new NetworkClient(
+                    selector,
+                    metadata,
+                    clientId,
+                    DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+                    DEFAULT_RECONNECT_BACKOFF_MS,
+                    DEFAULT_RECONNECT_BACKOFF_MAX,
+                    DEFAULT_SEND_BUFFER_BYTES,
+                    DEFAULT_RECEIVE_BUFFER_BYTES,
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                    time,
+                    true,
+                    new ApiVersions(),
+                    logContext,
+                    MetadataRecoveryStrategy.NONE);
+            ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(
+                    logContext,
+                    networkClient,
+                    metadata,
+                    time,
+                    
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    Integer.MAX_VALUE);
+            return new AdminClient(time, highLevelClient, 
metadata.fetch().nodes());
+        }
+
+        AdminClient(Time time, ConsumerNetworkClient client, List<Node> 
bootstrapBrokers) {
+            this.time = time;
+            this.client = client;
+            this.bootstrapBrokers = bootstrapBrokers;
+            startNetworkThread();
+        }
+
+        private void startNetworkThread() {
+            Thread networkThread = new Thread(() -> {
+                try {
+                    while (running) {
+                        client.poll(time.timer(Long.MAX_VALUE));
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("admin-client-network-thread exited", t);
+                } finally {
+                    pendingFutures.forEach(future -> {
+                        try {
+                            future.raise(Errors.UNKNOWN_SERVER_ERROR);
+                        } catch (IllegalStateException ignored) {
+                        }
+                    });
+                    pendingFutures.clear();
+                }
+            }, "admin-client-network-thread");
+            networkThread.setDaemon(true);
+            networkThread.start();
+        }
+
+        private AbstractResponse send(Node target, AbstractRequest.Builder<?> 
request) throws InterruptedException {
+            RequestFuture<ClientResponse> future = client.send(target, 
request);
+            pendingFutures.add(future);
+            future.awaitDone(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            pendingFutures.remove(future);
+            if (future.succeeded()) {
+                return future.value().responseBody();
+            } else {
+                throw future.exception();
+            }
+        }
+
+        private AbstractResponse sendAnyNode(AbstractRequest.Builder<?> 
request) {
+            for (Node broker : bootstrapBrokers) {
+                try {
+                    return send(broker, request);
+                } catch (AuthenticationException e) {
+                    throw e;
+                } catch (Exception e) {
+                    LOGGER.debug("Request {} failed against node {}", 
request.apiKey(), broker, e);
+                }
+            }
+            throw new RuntimeException("Request " + request.apiKey() + " 
failed on brokers " + bootstrapBrokers);
+        }
+
+        private KafkaFuture<NodeApiVersions> getNodeApiVersions(Node node) {
+            final KafkaFutureImpl<NodeApiVersions> future = new 
KafkaFutureImpl<>();
+            try {
+                ApiVersionsResponse response = (ApiVersionsResponse) 
send(node, new ApiVersionsRequest.Builder());
+                Errors error = Errors.forCode(response.data().errorCode());
+                if (error.exception() != null) {
+                    future.completeExceptionally(error.exception());
+                } else {
+                    future.complete(new 
NodeApiVersions(response.data().apiKeys(), response.data().supportedFeatures(), 
response.data().zkMigrationReady()));
+                }
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+
+            return future;
+        }
+
+        public void awaitBrokers() {
+            List<Node> nodes;
+            do {
+                nodes = findAllBrokers();
+                if (nodes.isEmpty()) {
+                    try {
+                        Thread.sleep(50);

Review Comment:
   `TimeUnit.MILLISECONDS.sleep(50);`



##########
tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionSpec;
+
+public class BrokerApiVersionsCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    public static void execute(String... args) throws IOException {
+        BrokerVersionCommandOptions opts = new 
BrokerVersionCommandOptions(args);
+        try (AdminClient adminClient = createAdminClient(opts)) {
+            adminClient.awaitBrokers();
+            Map<Node, KafkaFuture<NodeApiVersions>> brokerMap = 
adminClient.listAllBrokerVersionInfo();
+            brokerMap.forEach((broker, future) -> {
+                try {
+                    NodeApiVersions apiVersions = future.get();
+                    System.out.print(broker + " -> " + 
apiVersions.toString(true) + "\n");
+                } catch (Exception e) {
+                    System.out.print(broker + " -> ERROR: " + e.getMessage() + 
"\n");
+                }
+            });
+        }
+    }
+
+    private static AdminClient createAdminClient(BrokerVersionCommandOptions 
opts) throws IOException {
+        Properties props = opts.options.has(opts.commandConfigOpt) ?
+                Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) :
+                new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        return AdminClient.create(props);
+    }
+
+    private static class BrokerVersionCommandOptions extends 
CommandDefaultOptions {
+        private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server to connect to.";
+        private static final String COMMAND_CONFIG_DOC = "A property file 
containing configs to be passed to Admin Client.";
+
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<String> bootstrapServerOpt;
+
+        BrokerVersionCommandOptions(String[] args) {
+            super(args);
+            commandConfigOpt = parser.accepts("command-config", 
COMMAND_CONFIG_DOC)
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+                    .withRequiredArg()
+                    .describedAs("server(s) to use for bootstrapping")
+                    .ofType(String.class);
+            options = parser.parse(args);
+            checkArgs();
+        }
+
+        private void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.");
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+        }
+    }
+
+    private static class AdminClient implements AutoCloseable {
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminClient.class);
+        private static final int DEFAULT_CONNECTION_MAX_IDLE_MS = 9 * 60 * 
1000;
+        private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
+        private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
= 100;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MS = 50;
+        private static final int DEFAULT_RECONNECT_BACKOFF_MAX = 50;
+        private static final int DEFAULT_SEND_BUFFER_BYTES = 128 * 1024;
+        private static final int DEFAULT_RECEIVE_BUFFER_BYTES = 32 * 1024;
+        private static final int DEFAULT_RETRY_BACKOFF_MS = 100;
+
+        private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+        private static final ConfigDef ADMIN_CONFIG_DEF = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, 
ConfigDef.Type.STRING, ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
 ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.INT, DEFAULT_REQUEST_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
+                
.define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 
ConfigDef.Type.LONG, 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, 
ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+
+        private volatile boolean running = true;
+        private final Time time;
+        private final ConsumerNetworkClient client;
+        private final List<Node> bootstrapBrokers;
+        private final ConcurrentLinkedQueue<RequestFuture<ClientResponse>> 
pendingFutures = new ConcurrentLinkedQueue<>();
+
+        static class AdminConfig extends AbstractConfig {
+            AdminConfig(Map<?, ?> originals) {
+                super(ADMIN_CONFIG_DEF, originals, false);
+            }
+        }
+
+        static AdminClient create(Properties props) {
+            return create(Utils.propsToMap(props));
+        }
+
+        static AdminClient create(Map<String, ?> props) {
+            return create(new AdminConfig(props));
+        }
+
+        static AdminClient create(AdminConfig config) {
+            String clientId = "admin-" + 
ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+            LogContext logContext = new LogContext("[LegacyAdminClient 
clientId=" + clientId + "] ");
+            Time time = Time.SYSTEM;
+            Metrics metrics = new Metrics(time);
+            Metadata metadata = new Metadata(
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
+                    CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS,
+                    60 * 60 * 1000L, logContext,
+                    new ClusterResourceListeners());
+            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(
+                    
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)));
+            Selector selector = new Selector(
+                    DEFAULT_CONNECTION_MAX_IDLE_MS,
+                    metrics,
+                    time,
+                    "admin",
+                    ClientUtils.createChannelBuilder(config, time, logContext),
+                    logContext);
+            NetworkClient networkClient = new NetworkClient(
+                    selector,
+                    metadata,
+                    clientId,
+                    DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+                    DEFAULT_RECONNECT_BACKOFF_MS,
+                    DEFAULT_RECONNECT_BACKOFF_MAX,
+                    DEFAULT_SEND_BUFFER_BYTES,
+                    DEFAULT_RECEIVE_BUFFER_BYTES,
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                    
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                    time,
+                    true,
+                    new ApiVersions(),
+                    logContext,
+                    MetadataRecoveryStrategy.NONE);
+            ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(
+                    logContext,
+                    networkClient,
+                    metadata,
+                    time,
+                    
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    Integer.MAX_VALUE);
+            return new AdminClient(time, highLevelClient, 
metadata.fetch().nodes());
+        }
+
+        AdminClient(Time time, ConsumerNetworkClient client, List<Node> 
bootstrapBrokers) {
+            this.time = time;
+            this.client = client;
+            this.bootstrapBrokers = bootstrapBrokers;
+            startNetworkThread();
+        }
+
+        private void startNetworkThread() {
+            Thread networkThread = new Thread(() -> {
+                try {
+                    while (running) {
+                        client.poll(time.timer(Long.MAX_VALUE));
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("admin-client-network-thread exited", t);
+                } finally {
+                    pendingFutures.forEach(future -> {
+                        try {
+                            future.raise(Errors.UNKNOWN_SERVER_ERROR);
+                        } catch (IllegalStateException ignored) {
+                        }
+                    });
+                    pendingFutures.clear();
+                }
+            }, "admin-client-network-thread");
+            networkThread.setDaemon(true);
+            networkThread.start();
+        }
+
+        private AbstractResponse send(Node target, AbstractRequest.Builder<?> 
request) throws InterruptedException {
+            RequestFuture<ClientResponse> future = client.send(target, 
request);
+            pendingFutures.add(future);

Review Comment:
   Do we need another thread to poll client? Maybe we can poll the client by 
the same thread to simplify this tool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to