chia7712 commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r3212666275
##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -201,6 +252,14 @@ public static NetworkClient
createNetworkClient(AbstractConfig config,
metricsGroupPrefix,
channelBuilder,
logContext);
+ ClientDnsLookup dnsLookup =
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
+
+ NetworkClient.BootstrapConfiguration bootstrapConfiguration =
NetworkClient.BootstrapConfiguration.enabled(
+ bootstrapServers != null ? bootstrapServers : List.of(),
Review Comment:
Does it make sense to create a "enabled config" with `null` brokers?
##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1235,123 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey ==
ApiKeys.PUSH_TELEMETRY;
}
+ public static class BootstrapConfiguration {
+ public static final BootstrapConfiguration DISABLED =
+ new BootstrapConfiguration(List.of(), null, 0, 0);
+
+ public final List<String> bootstrapServers;
+ public final ClientDnsLookup clientDnsLookup;
+ public final long bootstrapResolveTimeoutMs;
+ public final long retryBackoffMs;
+
+ private BootstrapConfiguration(final List<String> bootstrapServers,
+ final ClientDnsLookup clientDnsLookup,
+ final long bootstrapResolveTimeoutMs,
+ final long retryBackoffMs) {
+ this.bootstrapServers = bootstrapServers;
+ this.clientDnsLookup = clientDnsLookup;
+ this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ public static BootstrapConfiguration enabled(final List<String>
bootstrapServers,
+ final ClientDnsLookup
clientDnsLookup,
+ final long
bootstrapResolveTimeoutMs,
+ final long
retryBackoffMs) {
+ return new BootstrapConfiguration(bootstrapServers,
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs);
+ }
+ }
+
+ /**
+ * Attempts to resolve bootstrap server addresses via DNS and create an
initial bootstrap cluster.
+ * This method is called from {@link #poll(long, long)} and uses a truly
asynchronous approach
+ * to avoid blocking on DNS resolution.
+ *
+ * <p>DNS resolution is performed on a separate thread via {@link
CompletableFuture}. This ensures
+ * the event loop remains responsive even if DNS lookups block or take a
long time. The bootstrap
+ * timeout can interrupt a pending DNS resolution, unlike synchronous
approaches.
+ *
+ * @param currentTimeMs The current time in milliseconds
+ * @throws BootstrapResolutionException if the bootstrap timeout expires
before DNS resolution succeeds
+ */
+ void ensureBootstrapped(final long currentTimeMs) {
+ if (bootstrapConfiguration == BootstrapConfiguration.DISABLED ||
metadataUpdater.isBootstrapped())
+ return;
+
+ if (Thread.interrupted()) {
+ cancelBootstrapResolution();
+ throw new InterruptException(new InterruptedException());
+ }
+
+ // Lazy initialization: create timer on first poll to ensure timeout
starts when polling begins
+ if (bootstrapTimer == null) {
+ bootstrapTimer =
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+ }
+
+ bootstrapTimer.update(currentTimeMs);
+ checkBootstrapTimeout();
+ handleAsyncBootstrapResolution();
+ }
+
+ /**
+ * Check if bootstrap timeout has expired and throw exception if so.
+ */
+ private void checkBootstrapTimeout() {
+ if (bootstrapTimer.isExpired()) {
+ cancelBootstrapResolution();
+ throw new BootstrapResolutionException("Failed to resolve
bootstrap servers after " +
+ bootstrapConfiguration.bootstrapResolveTimeoutMs + "ms. " +
+ "Please check your bootstrap.servers configuration and DNS
settings.");
+ }
+ }
+
+ /**
+ * Handle the async DNS resolution state machine.
+ * States: Not Started -> In Progress -> Completed (Success/Failure)
+ */
+ private void handleAsyncBootstrapResolution() {
+ if (pendingBootstrapResolution == null) {
+ pendingBootstrapResolution = CompletableFuture.supplyAsync(
+ () -> ClientUtils.parseAddresses(
+ bootstrapConfiguration.bootstrapServers,
+ bootstrapConfiguration.clientDnsLookup
+ ),
+ bootstrapExecutor
+ );
+ return;
+ }
+
+ // check again on next poll
+ if (!pendingBootstrapResolution.isDone()) {
+ return;
+ }
+
+ processBootstrapResolutionResult();
+ }
+
+ /**
+ * Process the completed bootstrap DNS resolution result.
+ */
+ private void processBootstrapResolutionResult() {
+ List<InetSocketAddress> servers = List.of();
+ try {
+ servers = pendingBootstrapResolution.getNow(List.of());
+ } catch (CompletionException e) {
+ log.debug("DNS resolution failed: {}", e.getCause().getMessage());
+ }
+
+ if (!servers.isEmpty()) {
+ log.debug("Bootstrap DNS resolution succeeded, {} servers
resolved", servers.size());
+ metadataUpdater.bootstrap(servers);
+ pendingBootstrapResolution = null;
+ return;
+ }
+
+ // Failed - reset so next poll retries
+ log.warn("Failed to resolve bootstrap servers, will retry on next
poll. Remaining time: {}ms", bootstrapTimer.remainingMs());
Review Comment:
Does it flood the log with warning messages?
##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1235,123 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey ==
ApiKeys.PUSH_TELEMETRY;
}
+ public static class BootstrapConfiguration {
+ public static final BootstrapConfiguration DISABLED =
+ new BootstrapConfiguration(List.of(), null, 0, 0);
+
+ public final List<String> bootstrapServers;
+ public final ClientDnsLookup clientDnsLookup;
+ public final long bootstrapResolveTimeoutMs;
+ public final long retryBackoffMs;
+
+ private BootstrapConfiguration(final List<String> bootstrapServers,
+ final ClientDnsLookup clientDnsLookup,
+ final long bootstrapResolveTimeoutMs,
+ final long retryBackoffMs) {
+ this.bootstrapServers = bootstrapServers;
+ this.clientDnsLookup = clientDnsLookup;
+ this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ public static BootstrapConfiguration enabled(final List<String>
bootstrapServers,
+ final ClientDnsLookup
clientDnsLookup,
+ final long
bootstrapResolveTimeoutMs,
+ final long
retryBackoffMs) {
+ return new BootstrapConfiguration(bootstrapServers,
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs);
+ }
+ }
+
+ /**
+ * Attempts to resolve bootstrap server addresses via DNS and create an
initial bootstrap cluster.
+ * This method is called from {@link #poll(long, long)} and uses a truly
asynchronous approach
+ * to avoid blocking on DNS resolution.
+ *
+ * <p>DNS resolution is performed on a separate thread via {@link
CompletableFuture}. This ensures
+ * the event loop remains responsive even if DNS lookups block or take a
long time. The bootstrap
+ * timeout can interrupt a pending DNS resolution, unlike synchronous
approaches.
+ *
+ * @param currentTimeMs The current time in milliseconds
+ * @throws BootstrapResolutionException if the bootstrap timeout expires
before DNS resolution succeeds
+ */
+ void ensureBootstrapped(final long currentTimeMs) {
+ if (bootstrapConfiguration == BootstrapConfiguration.DISABLED ||
metadataUpdater.isBootstrapped())
+ return;
+
+ if (Thread.interrupted()) {
+ cancelBootstrapResolution();
+ throw new InterruptException(new InterruptedException());
+ }
+
+ // Lazy initialization: create timer on first poll to ensure timeout
starts when polling begins
+ if (bootstrapTimer == null) {
+ bootstrapTimer =
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+ }
+
+ bootstrapTimer.update(currentTimeMs);
+ checkBootstrapTimeout();
+ handleAsyncBootstrapResolution();
+ }
+
+ /**
+ * Check if bootstrap timeout has expired and throw exception if so.
+ */
+ private void checkBootstrapTimeout() {
+ if (bootstrapTimer.isExpired()) {
+ cancelBootstrapResolution();
+ throw new BootstrapResolutionException("Failed to resolve
bootstrap servers after " +
+ bootstrapConfiguration.bootstrapResolveTimeoutMs + "ms. " +
+ "Please check your bootstrap.servers configuration and DNS
settings.");
+ }
+ }
+
+ /**
+ * Handle the async DNS resolution state machine.
+ * States: Not Started -> In Progress -> Completed (Success/Failure)
+ */
+ private void handleAsyncBootstrapResolution() {
+ if (pendingBootstrapResolution == null) {
+ pendingBootstrapResolution = CompletableFuture.supplyAsync(
+ () -> ClientUtils.parseAddresses(
+ bootstrapConfiguration.bootstrapServers,
+ bootstrapConfiguration.clientDnsLookup
+ ),
+ bootstrapExecutor
+ );
+ return;
+ }
+
+ // check again on next poll
+ if (!pendingBootstrapResolution.isDone()) {
+ return;
+ }
+
+ processBootstrapResolutionResult();
+ }
+
+ /**
+ * Process the completed bootstrap DNS resolution result.
+ */
+ private void processBootstrapResolutionResult() {
+ List<InetSocketAddress> servers = List.of();
+ try {
+ servers = pendingBootstrapResolution.getNow(List.of());
+ } catch (CompletionException e) {
+ log.debug("DNS resolution failed: {}", e.getCause().getMessage());
Review Comment:
`e.getCause().getMessage()` - would it cause NPE?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]