Nikita-Shupletsov commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2818232114
##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
@@ -53,6 +53,44 @@ public final class ClientUtils {
private ClientUtils() {
}
+ public static List<InetSocketAddress> validateAddresses(List<String> urls,
ClientDnsLookup clientDnsLookup) {
Review Comment:
as the old one is called parseAndValidateAddresses, this one is probably
closer to parseAddresses, as we omit the validation part, not the parsing part
##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1186,87 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey ==
ApiKeys.PUSH_TELEMETRY;
}
+ public static class BootstrapConfiguration {
+ public List<String> bootstrapServers;
+ public final ClientDnsLookup clientDnsLookup;
+ public final long bootstrapResolveTimeoutMs;
+ private boolean isBootstrapDisabled;
+
+ public BootstrapConfiguration(final List<String> bootstrapServers,
+ final ClientDnsLookup clientDnsLookup,
+ final long bootstrapResolveTimeoutMs) {
+ this.bootstrapServers = bootstrapServers;
+ this.clientDnsLookup = clientDnsLookup;
+ this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+ this.isBootstrapDisabled = false;
+ }
+
+ public static BootstrapConfiguration disabled() {
+ BootstrapConfiguration bootstrapConfiguration = new
BootstrapConfiguration(List.of(), null, 0);
+ bootstrapConfiguration.disableBootstrap();
+ return bootstrapConfiguration;
+ }
+
+ public void disableBootstrap() {
+ this.isBootstrapDisabled = true;
+ }
+ }
+
+ private class BootstrapState {
+ private final Timer timer;
+ private final List<String> bootstrapServers;
+ private final ClientDnsLookup clientDnsLookup;
+ private final long dnsResolutionTimeoutMs;
+ private final boolean isDisabled;
+
+ BootstrapState(BootstrapConfiguration bootstrapConfiguration) {
+ this.dnsResolutionTimeoutMs =
bootstrapConfiguration.bootstrapResolveTimeoutMs;
+ this.timer =
time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+ this.bootstrapServers = bootstrapConfiguration.bootstrapServers;
+ this.clientDnsLookup = bootstrapConfiguration.clientDnsLookup;
+ this.isDisabled = bootstrapConfiguration.isBootstrapDisabled;
+ }
+
+ List<InetSocketAddress> tryResolveAddresses(final long currentTimeMs) {
+ timer.update(currentTimeMs);
+ List<InetSocketAddress> addresses =
ClientUtils.validateAddresses(bootstrapServers, clientDnsLookup);
+ if (!addresses.isEmpty()) {
+ timer.reset(dnsResolutionTimeoutMs);
+ return addresses;
+ }
+
+ if (timer.isExpired()) {
+ throw new BootstrapResolutionException("Timeout while
attempting to resolve bootstrap " +
+ "servers. ");
+ }
+ return ClientUtils.validateAddresses(bootstrapServers,
clientDnsLookup);
+ }
+
+ boolean isDisabled() {
+ return isDisabled;
+ }
+
+ boolean isTimerExpired() {
+ return timer.isExpired();
+ }
+ }
+
+ void ensureBootstrapped(final long currentTimeMs) {
Review Comment:
is this method intended to call multiple times? or is it supposed to block
till it bootstraps the metadata or the timer expires?
##########
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##########
Review Comment:
this and the new methods look alike. would it be possible to extract the
shared part and reuse it in both of them? to prevent the implementation drift
overtime
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]