chia7712 commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r3212666275


##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -201,6 +252,14 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
                     metricsGroupPrefix,
                     channelBuilder,
                     logContext);
+            ClientDnsLookup dnsLookup = 
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
+
+            NetworkClient.BootstrapConfiguration bootstrapConfiguration = 
NetworkClient.BootstrapConfiguration.enabled(
+                bootstrapServers != null ? bootstrapServers : List.of(),

Review Comment:
   Does it make sense to create a "enabled config" with `null` brokers?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1235,123 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public static final BootstrapConfiguration DISABLED =
+            new BootstrapConfiguration(List.of(), null, 0, 0);
+
+        public final List<String> bootstrapServers;
+        public final ClientDnsLookup clientDnsLookup;
+        public final long bootstrapResolveTimeoutMs;
+        public final long retryBackoffMs;
+
+        private BootstrapConfiguration(final List<String> bootstrapServers,
+                                       final ClientDnsLookup clientDnsLookup,
+                                       final long bootstrapResolveTimeoutMs,
+                                       final long retryBackoffMs) {
+            this.bootstrapServers = bootstrapServers;
+            this.clientDnsLookup = clientDnsLookup;
+            this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+            this.retryBackoffMs = retryBackoffMs;
+        }
+
+        public static BootstrapConfiguration enabled(final List<String> 
bootstrapServers,
+                                                      final ClientDnsLookup 
clientDnsLookup,
+                                                      final long 
bootstrapResolveTimeoutMs,
+                                                      final long 
retryBackoffMs) {
+            return new BootstrapConfiguration(bootstrapServers, 
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs);
+        }
+    }
+
+    /**
+     * Attempts to resolve bootstrap server addresses via DNS and create an 
initial bootstrap cluster.
+     * This method is called from {@link #poll(long, long)} and uses a truly 
asynchronous approach
+     * to avoid blocking on DNS resolution.
+     *
+     * <p>DNS resolution is performed on a separate thread via {@link 
CompletableFuture}. This ensures
+     * the event loop remains responsive even if DNS lookups block or take a 
long time. The bootstrap
+     * timeout can interrupt a pending DNS resolution, unlike synchronous 
approaches.
+     *
+     * @param currentTimeMs The current time in milliseconds
+     * @throws BootstrapResolutionException if the bootstrap timeout expires 
before DNS resolution succeeds
+     */
+    void ensureBootstrapped(final long currentTimeMs) {
+        if (bootstrapConfiguration == BootstrapConfiguration.DISABLED || 
metadataUpdater.isBootstrapped())
+            return;
+
+        if (Thread.interrupted()) {
+            cancelBootstrapResolution();
+            throw new InterruptException(new InterruptedException());
+        }
+
+        // Lazy initialization: create timer on first poll to ensure timeout 
starts when polling begins
+        if (bootstrapTimer == null) {
+            bootstrapTimer = 
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+        }
+
+        bootstrapTimer.update(currentTimeMs);
+        checkBootstrapTimeout();
+        handleAsyncBootstrapResolution();
+    }
+
+    /**
+     * Check if bootstrap timeout has expired and throw exception if so.
+     */
+    private void checkBootstrapTimeout() {
+        if (bootstrapTimer.isExpired()) {
+            cancelBootstrapResolution();
+            throw new BootstrapResolutionException("Failed to resolve 
bootstrap servers after " +
+                bootstrapConfiguration.bootstrapResolveTimeoutMs + "ms. " +
+                "Please check your bootstrap.servers configuration and DNS 
settings.");
+        }
+    }
+
+    /**
+     * Handle the async DNS resolution state machine.
+     * States: Not Started -> In Progress -> Completed (Success/Failure)
+     */
+    private void handleAsyncBootstrapResolution() {
+        if (pendingBootstrapResolution == null) {
+            pendingBootstrapResolution = CompletableFuture.supplyAsync(
+                () -> ClientUtils.parseAddresses(
+                    bootstrapConfiguration.bootstrapServers,
+                    bootstrapConfiguration.clientDnsLookup
+                ),
+                bootstrapExecutor
+            );
+            return;
+        }
+
+        // check again on next poll
+        if (!pendingBootstrapResolution.isDone()) {
+            return;
+        }
+
+        processBootstrapResolutionResult();
+    }
+
+    /**
+     * Process the completed bootstrap DNS resolution result.
+     */
+    private void processBootstrapResolutionResult() {
+        List<InetSocketAddress> servers = List.of();
+        try {
+            servers = pendingBootstrapResolution.getNow(List.of());
+        } catch (CompletionException e) {
+            log.debug("DNS resolution failed: {}", e.getCause().getMessage());
+        }
+
+        if (!servers.isEmpty()) {
+            log.debug("Bootstrap DNS resolution succeeded, {} servers 
resolved", servers.size());
+            metadataUpdater.bootstrap(servers);
+            pendingBootstrapResolution = null;
+            return;
+        }
+
+        // Failed - reset so next poll retries
+        log.warn("Failed to resolve bootstrap servers, will retry on next 
poll. Remaining time: {}ms", bootstrapTimer.remainingMs());

Review Comment:
   Does it flood the log with warning messages?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1235,123 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public static final BootstrapConfiguration DISABLED =
+            new BootstrapConfiguration(List.of(), null, 0, 0);
+
+        public final List<String> bootstrapServers;
+        public final ClientDnsLookup clientDnsLookup;
+        public final long bootstrapResolveTimeoutMs;
+        public final long retryBackoffMs;
+
+        private BootstrapConfiguration(final List<String> bootstrapServers,
+                                       final ClientDnsLookup clientDnsLookup,
+                                       final long bootstrapResolveTimeoutMs,
+                                       final long retryBackoffMs) {
+            this.bootstrapServers = bootstrapServers;
+            this.clientDnsLookup = clientDnsLookup;
+            this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+            this.retryBackoffMs = retryBackoffMs;
+        }
+
+        public static BootstrapConfiguration enabled(final List<String> 
bootstrapServers,
+                                                      final ClientDnsLookup 
clientDnsLookup,
+                                                      final long 
bootstrapResolveTimeoutMs,
+                                                      final long 
retryBackoffMs) {
+            return new BootstrapConfiguration(bootstrapServers, 
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs);
+        }
+    }
+
+    /**
+     * Attempts to resolve bootstrap server addresses via DNS and create an 
initial bootstrap cluster.
+     * This method is called from {@link #poll(long, long)} and uses a truly 
asynchronous approach
+     * to avoid blocking on DNS resolution.
+     *
+     * <p>DNS resolution is performed on a separate thread via {@link 
CompletableFuture}. This ensures
+     * the event loop remains responsive even if DNS lookups block or take a 
long time. The bootstrap
+     * timeout can interrupt a pending DNS resolution, unlike synchronous 
approaches.
+     *
+     * @param currentTimeMs The current time in milliseconds
+     * @throws BootstrapResolutionException if the bootstrap timeout expires 
before DNS resolution succeeds
+     */
+    void ensureBootstrapped(final long currentTimeMs) {
+        if (bootstrapConfiguration == BootstrapConfiguration.DISABLED || 
metadataUpdater.isBootstrapped())
+            return;
+
+        if (Thread.interrupted()) {
+            cancelBootstrapResolution();
+            throw new InterruptException(new InterruptedException());
+        }
+
+        // Lazy initialization: create timer on first poll to ensure timeout 
starts when polling begins
+        if (bootstrapTimer == null) {
+            bootstrapTimer = 
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+        }
+
+        bootstrapTimer.update(currentTimeMs);
+        checkBootstrapTimeout();
+        handleAsyncBootstrapResolution();
+    }
+
+    /**
+     * Check if bootstrap timeout has expired and throw exception if so.
+     */
+    private void checkBootstrapTimeout() {
+        if (bootstrapTimer.isExpired()) {
+            cancelBootstrapResolution();
+            throw new BootstrapResolutionException("Failed to resolve 
bootstrap servers after " +
+                bootstrapConfiguration.bootstrapResolveTimeoutMs + "ms. " +
+                "Please check your bootstrap.servers configuration and DNS 
settings.");
+        }
+    }
+
+    /**
+     * Handle the async DNS resolution state machine.
+     * States: Not Started -> In Progress -> Completed (Success/Failure)
+     */
+    private void handleAsyncBootstrapResolution() {
+        if (pendingBootstrapResolution == null) {
+            pendingBootstrapResolution = CompletableFuture.supplyAsync(
+                () -> ClientUtils.parseAddresses(
+                    bootstrapConfiguration.bootstrapServers,
+                    bootstrapConfiguration.clientDnsLookup
+                ),
+                bootstrapExecutor
+            );
+            return;
+        }
+
+        // check again on next poll
+        if (!pendingBootstrapResolution.isDone()) {
+            return;
+        }
+
+        processBootstrapResolutionResult();
+    }
+
+    /**
+     * Process the completed bootstrap DNS resolution result.
+     */
+    private void processBootstrapResolutionResult() {
+        List<InetSocketAddress> servers = List.of();
+        try {
+            servers = pendingBootstrapResolution.getNow(List.of());
+        } catch (CompletionException e) {
+            log.debug("DNS resolution failed: {}", e.getCause().getMessage());

Review Comment:
   `e.getCause().getMessage()` - would it cause NPE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to