li4wang commented on code in PR #2320: URL: https://github.com/apache/zookeeper/pull/2320#discussion_r2678196385
########## zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.common.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xbill.DNS.Lookup; +import org.xbill.DNS.Record; +import org.xbill.DNS.SRVRecord; +import org.xbill.DNS.Type; + +/** + * DNS SRV-based HostProvider that dynamically resolves host port names from DNS SRV records. + * + * <p>This implementation periodically refreshes the server list by querying DNS SRV records + * and uses HostConnectionManager for all connection management and reconfiguration logic.</p> + * + * <p><strong>Two-Phase Update Strategy:</strong></p> + * <ul> + * <li><strong>Phase 1 (Background):</strong> Timer thread detects DNS changes without caching results</li> + * <li><strong>Phase 2 (Connection-time):</strong> Fresh DNS lookup during connect attempt if changes detected</li> + * </ul> + */ [email protected] +public final class DnsSrvHostProvider implements HostProvider { + private static final Logger LOG = LoggerFactory.getLogger(DnsSrvHostProvider.class); + + public interface DnsSrvResolver { + SRVRecord[] lookupSrvRecords(String dnsSrvName) throws IOException; + } + + private final String dnsSrvName; + private final DnsSrvResolver dnsResolver; + + private final HostConnectionManager connectionManager; + // Track the previous server list to detect changes + private volatile Set<InetSocketAddress> previousServerSet; + private Timer dnsRefreshTimer; + + // Track the current connected host for accurate load balancing decisions + private final AtomicReference<InetSocketAddress> currentConnectedHost = new AtomicReference<>(); + private final AtomicBoolean serverListChanged = new AtomicBoolean(false); + + /** + * Constructs a DnsSrvHostProvider with the given DNS name + * + * @param dnsSrvName the DNS name to query for SRV records + * @throws IllegalArgumentException if dnsSrvName is null or empty or invalid + * or if no SRV records are found for the DNS name + * or if DNS lookup fails + */ + public DnsSrvHostProvider(final String dnsSrvName) { + this(dnsSrvName, null); + } + + + /** + * Constructs a DnsSrvHostProvider with the given DNS name and ZKClientConfig + * + * @param dnsSrvName the DNS name to query for SRV records + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsSrvName is null or empty or invalid + * or if no SRV records are found for the DNS name + * or if DNS lookup fails + */ + public DnsSrvHostProvider(final String dnsSrvName, final ZKClientConfig clientConfig) { + this(dnsSrvName, System.currentTimeMillis() ^ dnsSrvName.hashCode(), clientConfig); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name, randomness seed and ZKClientConfig + * + * @param dnsSrvName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsSrvName is null or empty or invalid + * or if no SRV records are found for the DNS name + * or if DNS lookup fails + */ + public DnsSrvHostProvider(final String dnsSrvName, final long randomnessSeed, final ZKClientConfig clientConfig) { + this(dnsSrvName, randomnessSeed, new DefaultDnsResolver(), clientConfig); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name, randomization seed, DNS resolver and ZKClientConfig + * + * @param dnsSrvName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @param dnsResolver custom DNS resolver + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsSrvName is null or empty or invalid + * or if no SRV records are found for the DNS name + * or if DNS lookup fails + */ + public DnsSrvHostProvider(final String dnsSrvName, final long randomnessSeed, final DnsSrvResolver dnsResolver, final ZKClientConfig clientConfig) { + if (StringUtils.isBlank(dnsSrvName)) { + throw new IllegalArgumentException("DNS name cannot be null or empty"); + } + + this.dnsSrvName = dnsSrvName; + this.dnsResolver = dnsResolver; + try { + final List<InetSocketAddress> serverAddresses = lookupDnsSrvRecords(); + if (serverAddresses.isEmpty()) { + throw new IllegalArgumentException("No SRV records found for DNS name: " + dnsSrvName); + } + + this.connectionManager = new HostConnectionManager(serverAddresses, randomnessSeed, clientConfig); + this.previousServerSet = new HashSet<>(serverAddresses); + + + final long refreshIntervalInSeconds = getRefreshInterval(); + if (refreshIntervalInSeconds > 0) { + dnsRefreshTimer = new Timer("DnsSrvRefresh-" + dnsSrvName, true); + dnsRefreshTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + refreshServerListInBackground(); + } + }, + refreshIntervalInSeconds * 1000, + refreshIntervalInSeconds * 1000); + } + + LOG.info("DnsSrvHostProvider initialized with {} servers from DNS name: {} with refresh interval: {} seconds", + serverAddresses.size(), dnsSrvName, refreshIntervalInSeconds); + } catch (final Exception e) { + LOG.error("Failed to initialize DnsSrvHostProvider for DNS name: {}", dnsSrvName, e); + + if (dnsRefreshTimer != null) { + dnsRefreshTimer.cancel(); + } + + if (e instanceof IllegalArgumentException) { + throw e; + } else { + throw new IllegalArgumentException("Failed to initialize DnsSrvHostProvider for DNS name: " + dnsSrvName, e); + } + } + } + + @Override + public int size() { + return connectionManager.size(); + } + + @Override + public InetSocketAddress next(long spinDelay) { + applyServerListUpdate(); + return connectionManager.next(spinDelay); + } + + @Override + public void onConnected() { + currentConnectedHost.set(connectionManager.getServerAtCurrentIndex()); + connectionManager.onConnected(); + } + + @Override + public boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost) { + return connectionManager.updateServerList(serverAddresses, currentHost); + } + + @Override + public void close() { + if (dnsRefreshTimer != null) { + dnsRefreshTimer.cancel(); + } + } + + private long getRefreshInterval() { + final long defaultInterval = ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS_DEFAULT; + final long interval = Long.getLong(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS, defaultInterval); + if (interval < 0) { + LOG.warn("Invalid DNS SRV refresh interval {} seconds, using default {} seconds", interval, defaultInterval); + return defaultInterval; + } + if (interval == 0) { + LOG.info("DNS SRV refresh disabled (interval = 0) for {}", dnsSrvName); + } + return interval; + } + + private List<InetSocketAddress> lookupDnsSrvRecords() { + final List<InetSocketAddress> addresses = new ArrayList<>(); + + try { + final SRVRecord[] srvRecords = dnsResolver.lookupSrvRecords(dnsSrvName); + if (srvRecords.length == 0) { + LOG.warn("No SRV records found for DNS name: {}", dnsSrvName); + return addresses; + } + + for (final SRVRecord srvRecord : srvRecords) { + try { + final InetSocketAddress address = createAddressFromSrvRecord(srvRecord); + if (address != null) { + addresses.add(address); + } + } catch (final Exception e) { + LOG.warn("Failed to create address from SRV record {}: {}", srvRecord, e.getMessage()); Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
