Repository: kudu Updated Branches: refs/heads/master 0c52f161b -> 0c44223ea
[java client] Extract ip2client out of AsyncKuduClient As part of making the Java client more modular and easier to test, this patch is moving almost all of the connections management into a separate class. The change was rather painless. Change-Id: I48b0f3f262abd5ee26869202f661b3c25158f39c Reviewed-on: http://gerrit.cloudera.org:8080/4717 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <a...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e5b7ebf8 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e5b7ebf8 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e5b7ebf8 Branch: refs/heads/master Commit: e5b7ebf8daa4b17a475814e62ed87d9dff4f6095 Parents: 0c52f16 Author: Jean-Daniel Cryans <jdcry...@apache.org> Authored: Thu Oct 13 12:36:23 2016 -0700 Committer: Jean-Daniel Cryans <jdcry...@apache.org> Committed: Fri Oct 14 21:38:26 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 373 +---------------- .../org/apache/kudu/client/ConnectionCache.java | 395 +++++++++++++++++++ .../kudu/client/TestAsyncKuduSession.java | 4 +- 3 files changed, 417 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index cbb128e..9b5ba5b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -37,9 +37,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Message; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.socket.nio.NioWorkerPool; import org.apache.kudu.Common; import org.apache.kudu.Schema; import org.apache.kudu.annotations.InterfaceAudience; @@ -51,14 +48,10 @@ import org.apache.kudu.util.AsyncUtil; import org.apache.kudu.util.NetUtil; import org.apache.kudu.util.Pair; import org.apache.kudu.util.Slice; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.DefaultChannelPipeline; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.SocketChannel; -import org.jboss.netty.channel.socket.SocketChannelConfig; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.channel.socket.nio.NioWorkerPool; import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.Timeout; import org.jboss.netty.util.TimerTask; @@ -66,19 +59,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -86,7 +73,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -148,33 +134,7 @@ public class AsyncKuduClient implements AutoCloseable { private final ConcurrentHashMap<String, TableLocationsCache> tableLocations = new ConcurrentHashMap<>(); - /** - * Cache that maps a TabletServer address ("ip:port") to the clients - * connected to it. - * <p> - * Access to this map must be synchronized by locking its monitor. - * Logging the contents of this map (or calling toString) requires copying it first. - * <p> - * This isn't a {@link ConcurrentHashMap} because we don't use it frequently - * (just when connecting to / disconnecting from TabletClients) and when we - * add something to it, we want to do an atomic get-and-put, but - * {@code putIfAbsent} isn't a good fit for us since it requires to create - * an object that may be "wasted" in case another thread wins the insertion - * race, and we don't want to create unnecessary connections. - * <p> - * Upon disconnection, clients are automatically removed from this map. - * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does - * the clean-up on the {@code channelClosed} event, which is actually the - * 3rd and last event to be fired when a channel gets disconnected. The - * first one to get fired is, {@code channelDisconnected}. This matters to - * us because we want to purge disconnected clients from the cache as - * quickly as possible after the disconnection, to avoid handing out clients - * that are going to cause unnecessary errors. - * @see TabletClientPipeline#handleDisconnect - */ - @VisibleForTesting - @GuardedBy("ip2client") - final HashMap<String, TabletClient> ip2client = new HashMap<>(); + private final ConnectionCache connectionCache; @GuardedBy("sessions") private final Set<AsyncKuduSession> sessions = new HashSet<>(); @@ -241,6 +201,7 @@ public class AsyncKuduClient implements AutoCloseable { this.timer = b.timer; String clientId = UUID.randomUUID().toString().replace("-", ""); this.requestTracker = new RequestTracker(clientId); + this.connectionCache = new ConnectionCache(this); } /** @@ -579,6 +540,14 @@ public class AsyncKuduClient implements AutoCloseable { return requestTracker; } + HashedWheelTimer getTimer() { + return timer; + } + + ClientSocketChannelFactory getChannelFactory() { + return channelFactory; + } + /** * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table. * @param table the name of the table you intend to scan. @@ -958,13 +927,11 @@ public class AsyncKuduClient implements AutoCloseable { * but calling certain methods on the returned TabletClients can. For example, * it's possible to forcefully shutdown a connection to a tablet server by calling {@link * TabletClient#shutdown()}. - * @return Copy of the current TabletClients list + * @return copy of the current TabletClients list */ @VisibleForTesting List<TabletClient> getTabletClients() { - synchronized (ip2client) { - return new ArrayList<TabletClient>(ip2client.values()); - } + return connectionCache.getTabletClients(); } /** @@ -1476,7 +1443,7 @@ public class AsyncKuduClient implements AutoCloseable { * @return A live and initialized client for the specified master server. */ TabletClient newMasterClient(HostAndPort masterHostPort) { - String ip = getIP(masterHostPort.getHostText()); + String ip = ConnectionCache.getIP(masterHostPort.getHostText()); if (ip == null) { return null; } @@ -1484,36 +1451,11 @@ public class AsyncKuduClient implements AutoCloseable { // communicate with the masters to find out about them, and that's what we're trying to do. // The UUID is used for logging, so instead we're passing the "master table name" followed by // host and port which is enough to identify the node we're connecting to. - return newClient(MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(), + return connectionCache.newClient( + MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(), ip, masterHostPort.getPort()); } - TabletClient newClient(String uuid, final String host, final int port) { - final String hostport = host + ':' + port; - TabletClient client; - SocketChannel chan; - synchronized (ip2client) { - client = ip2client.get(hostport); - if (client != null && client.isAlive()) { - return client; - } - final TabletClientPipeline pipeline = new TabletClientPipeline(); - client = pipeline.init(uuid, host, port); - chan = channelFactory.newChannel(pipeline); - TabletClient oldClient = ip2client.put(hostport, client); - assert oldClient == null; - } - final SocketChannelConfig config = chan.getConfig(); - config.setConnectTimeoutMillis(5000); - config.setTcpNoDelay(true); - // Unfortunately there is no way to override the keep-alive timeout in - // Java since the JRE doesn't expose any way to call setsockopt() with - // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. - config.setKeepAlive(true); - chan.connect(new InetSocketAddress(host, port)); // Won't block. - return client; - } - /** * Invokes {@link #shutdown()} and waits for the configured admin timeout. This method returns * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs. @@ -1577,7 +1519,7 @@ public class AsyncKuduClient implements AutoCloseable { final class DisconnectCB implements Callback<Deferred<ArrayList<Void>>, ArrayList<List<OperationResponse>>> { public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> ignoredResponses) { - return disconnectEverything().addCallback(new ReleaseResourcesCB()); + return connectionCache.disconnectEverything().addCallback(new ReleaseResourcesCB()); } public String toString() { return "disconnect callback"; @@ -1614,287 +1556,12 @@ public class AsyncKuduClient implements AutoCloseable { return Deferred.group(deferreds); } - /** - * Closes every socket, which will also cancel all the RPCs in flight. - */ - private Deferred<ArrayList<Void>> disconnectEverything() { - ArrayList<Deferred<Void>> deferreds = - new ArrayList<Deferred<Void>>(2); - HashMap<String, TabletClient> ip2client_copy; - synchronized (ip2client) { - // Make a local copy so we can shutdown every Tablet Server clients - // without hold the lock while we iterate over the data structure. - ip2client_copy = new HashMap<String, TabletClient>(ip2client); - } - - for (TabletClient ts : ip2client_copy.values()) { - deferreds.add(ts.shutdown()); - } - final int size = deferreds.size(); - return Deferred.group(deferreds).addCallback( - new Callback<ArrayList<Void>, ArrayList<Void>>() { - public ArrayList<Void> call(final ArrayList<Void> arg) { - // Normally, now that we've shutdown() every client, all our caches should - // be empty since each shutdown() generates a DISCONNECTED event, which - // causes TabletClientPipeline to call removeClientFromIpCache(). - HashMap<String, TabletClient> logme = null; - synchronized (ip2client) { - if (!ip2client.isEmpty()) { - logme = new HashMap<String, TabletClient>(ip2client); - } - } - if (logme != null) { - // Putting this logging statement inside the synchronized block - // can lead to a deadlock, since HashMap.toString() is going to - // call TabletClient.toString() on each entry, and this locks the - // client briefly. Other parts of the code lock clients first and - // the ip2client HashMap second, so this can easily deadlock. - LOG.error("Some clients are left in the client cache and haven't" - + " been cleaned up: " + logme); - } - return arg; - } - - public String toString() { - return "wait " + size + " TabletClient.shutdown()"; - } - }); - } - - /** - * Blocking call. - * Performs a slow search of the IP used by the given client. - * <p> - * This is needed when we're trying to find the IP of the client before its - * channel has successfully connected, because Netty's API offers no way of - * retrieving the IP of the remote peer until we're connected to it. - * @param client The client we want the IP of. - * @return The IP of the client, or {@code null} if we couldn't find it. - */ - private InetSocketAddress slowSearchClientIP(final TabletClient client) { - String hostport = null; - synchronized (ip2client) { - for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) { - if (e.getValue() == client) { - hostport = e.getKey(); - break; - } - } - } - - if (hostport == null) { - HashMap<String, TabletClient> copy; - synchronized (ip2client) { - copy = new HashMap<String, TabletClient>(ip2client); - } - LOG.error("WTF? Should never happen! Couldn't find " + client - + " in " + copy); - return null; - } - final int colon = hostport.indexOf(':', 1); - if (colon < 1) { - LOG.error("WTF? Should never happen! No `:' found in " + hostport); - return null; - } - final String host = getIP(hostport.substring(0, colon)); - if (host == null) { - // getIP will print the reason why, there's nothing else we can do. - return null; - } - - int port; - try { - port = parsePortNumber(hostport.substring(colon + 1, - hostport.length())); - } catch (NumberFormatException e) { - LOG.error("WTF? Should never happen! Bad port in " + hostport, e); - return null; - } - return new InetSocketAddress(host, port); - } - - /** - * Removes the given client from the `ip2client` cache. - * @param client The client for which we must clear the ip cache - * @param remote The address of the remote peer, if known, or null - */ - private void removeClientFromIpCache(final TabletClient client, - final SocketAddress remote) { - - if (remote == null) { - return; // Can't continue without knowing the remote address. - } - - String hostport; - if (remote instanceof InetSocketAddress) { - final InetSocketAddress sock = (InetSocketAddress) remote; - final InetAddress addr = sock.getAddress(); - if (addr == null) { - LOG.error("WTF? Unresolved IP for " + remote - + ". This shouldn't happen."); - return; - } else { - hostport = addr.getHostAddress() + ':' + sock.getPort(); - } - } else { - LOG.error("WTF? Found a non-InetSocketAddress remote: " + remote - + ". This shouldn't happen."); - return; - } - - TabletClient old; - synchronized (ip2client) { - old = ip2client.remove(hostport); - } - - LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}"); - if (old == null) { - // Currently we're seeing this message when masters are disconnected and the hostport we got - // above is different than the one the user passes (that we use to populate ip2client). At - // worst this doubles the entries for masters, which has an insignificant impact. - // TODO When fixed, make this a WARN again. - LOG.trace("When expiring " + client + " from the client cache (host:port=" - + hostport + "), it was found that there was no entry" - + " corresponding to " + remote + ". This shouldn't happen."); - } - } - private boolean isMasterTable(String tableId) { // Checking that it's the same instance so there's absolutely no chance of confusing the master // 'table' for a user one. return MASTER_TABLE_NAME_PLACEHOLDER == tableId; } - private final class TabletClientPipeline extends DefaultChannelPipeline { - - private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class); - /** - * Have we already disconnected?. - * We use this to avoid doing the cleanup work for the same client more - * than once, even if we get multiple events indicating that the client - * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED). - * No synchronization needed as this is always accessed from only one - * thread at a time (equivalent to a non-shared state in a Netty handler). - */ - private boolean disconnected = false; - - TabletClient init(String uuid, String host, int port) { - final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, host, port); - if (defaultSocketReadTimeoutMs > 0) { - super.addLast("timeout-handler", - new ReadTimeoutHandler(timer, - defaultSocketReadTimeoutMs, - TimeUnit.MILLISECONDS)); - } - super.addLast("kudu-handler", client); - - return client; - } - - @Override - public void sendDownstream(final ChannelEvent event) { - if (event instanceof ChannelStateEvent) { - handleDisconnect((ChannelStateEvent) event); - } - super.sendDownstream(event); - } - - @Override - public void sendUpstream(final ChannelEvent event) { - if (event instanceof ChannelStateEvent) { - handleDisconnect((ChannelStateEvent) event); - } - super.sendUpstream(event); - } - - private void handleDisconnect(final ChannelStateEvent state_event) { - if (disconnected) { - return; - } - switch (state_event.getState()) { - case OPEN: - if (state_event.getValue() == Boolean.FALSE) { - break; // CLOSED - } - return; - case CONNECTED: - if (state_event.getValue() == null) { - break; // DISCONNECTED - } - return; - default: - return; // Not an event we're interested in, ignore it. - } - - disconnected = true; // So we don't clean up the same client twice. - try { - final TabletClient client = super.get(TabletClient.class); - SocketAddress remote = super.getChannel().getRemoteAddress(); - // At this point Netty gives us no easy way to access the - // SocketAddress of the peer we tried to connect to. This - // kinda sucks but I couldn't find an easier way. - if (remote == null) { - remote = slowSearchClientIP(client); - } - - synchronized (client) { - removeClientFromIpCache(client, remote); - } - } catch (Exception e) { - log.error("Uncaught exception when handling a disconnection of " + getChannel(), e); - } - } - - } - - /** - * Gets a hostname or an IP address and returns the textual representation - * of the IP address. - * <p> - * <strong>This method can block</strong> as there is no API for - * asynchronous DNS resolution in the JDK. - * @param host The hostname to resolve. - * @return The IP address associated with the given hostname, - * or {@code null} if the address couldn't be resolved. - */ - private static String getIP(final String host) { - final long start = System.nanoTime(); - try { - final String ip = InetAddress.getByName(host).getHostAddress(); - final long latency = System.nanoTime() - start; - if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) { - LOG.debug("Resolved IP of `" + host + "' to " - + ip + " in " + latency + "ns"); - } else if (latency >= 3000000/*ns*/) { - LOG.warn("Slow DNS lookup! Resolved IP of `" + host + "' to " - + ip + " in " + latency + "ns"); - } - return ip; - } catch (UnknownHostException e) { - LOG.error("Failed to resolve the IP of `" + host + "' in " - + (System.nanoTime() - start) + "ns"); - return null; - } - } - - /** - * Parses a TCP port number from a string. - * @param portnum The string to parse. - * @return A strictly positive, validated port number. - * @throws NumberFormatException if the string couldn't be parsed as an - * integer or if the value was outside of the range allowed for TCP ports. - */ - private static int parsePortNumber(final String portnum) - throws NumberFormatException { - final int port = Integer.parseInt(portnum); - if (port <= 0 || port > 65535) { - throw new NumberFormatException(port == 0 ? "port is zero" : - (port < 0 ? "port is negative: " - : "port is too large: ") + port); - } - return port; - } - void newTimeout(final TimerTask task, final long timeout_ms) { try { timer.newTimeout(task, timeout_ms, MILLISECONDS); @@ -2005,11 +1672,11 @@ public class AsyncKuduClient implements AutoCloseable { @GuardedBy("tabletServers") private void addTabletClient(String uuid, String host, int port, boolean isLeader) throws UnknownHostException { - String ip = getIP(host); + String ip = ConnectionCache.getIP(host); if (ip == null) { throw new UnknownHostException("Failed to resolve the IP of `" + host + "'"); } - TabletClient client = newClient(uuid, ip, port); + TabletClient client = connectionCache.newClient(uuid, ip, port); tabletServers.add(client); if (isLeader) { http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java new file mode 100644 index 0000000..f421d32 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -0,0 +1,395 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.client; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import org.apache.kudu.annotations.InterfaceAudience; +import org.apache.kudu.annotations.InterfaceStability; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.DefaultChannelPipeline; +import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.channel.socket.SocketChannelConfig; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * The ConnectionCache is responsible for managing connections to masters and tablets. There + * should only be one instance per Kudu client, and can <strong>not</strong> be shared between + * clients. + * <p> + * This class is thread-safe. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class ConnectionCache { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class); + + /** + * Cache that maps a TabletServer address ("ip:port") to the clients + * connected to it. + * <p> + * Access to this map must be synchronized by locking its monitor. + * Logging the contents of this map (or calling toString) requires copying it first. + * <p> + * This isn't a {@link ConcurrentHashMap} because we don't use it frequently + * (just when connecting to / disconnecting from TabletClients) and when we + * add something to it, we want to do an atomic get-and-put, but + * {@code putIfAbsent} isn't a good fit for us since it requires to create + * an object that may be "wasted" in case another thread wins the insertion + * race, and we don't want to create unnecessary connections. + * <p> + * Upon disconnection, clients are automatically removed from this map. + * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does + * the clean-up on the {@code channelClosed} event, which is actually the + * 3rd and last event to be fired when a channel gets disconnected. The + * first one to get fired is, {@code channelDisconnected}. This matters to + * us because we want to purge disconnected clients from the cache as + * quickly as possible after the disconnection, to avoid handing out clients + * that are going to cause unnecessary errors. + * @see TabletClientPipeline#handleDisconnect + */ + @GuardedBy("ip2client") + private final HashMap<String, TabletClient> ip2client = new HashMap<>(); + + private final AsyncKuduClient kuduClient; + + /** + * Create a new empty ConnectionCache that will used the passed client to create connections. + * @param client a client that contains the information we need to create connections + */ + ConnectionCache(AsyncKuduClient client) { + this.kuduClient = client; + } + + TabletClient newClient(String uuid, final String host, final int port) { + final String hostport = host + ':' + port; + TabletClient client; + SocketChannel chan; + synchronized (ip2client) { + client = ip2client.get(hostport); + if (client != null && client.isAlive()) { + return client; + } + final TabletClientPipeline pipeline = new TabletClientPipeline(); + client = pipeline.init(uuid, host, port); + chan = this.kuduClient.getChannelFactory().newChannel(pipeline); + TabletClient oldClient = ip2client.put(hostport, client); + assert oldClient == null; + } + final SocketChannelConfig config = chan.getConfig(); + config.setConnectTimeoutMillis(5000); + config.setTcpNoDelay(true); + // Unfortunately there is no way to override the keep-alive timeout in + // Java since the JRE doesn't expose any way to call setsockopt() with + // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. + config.setKeepAlive(true); + chan.connect(new InetSocketAddress(host, port)); // Won't block. + return client; + } + + /** + * Closes every socket, which will also cancel all the RPCs in flight. + */ + Deferred<ArrayList<Void>> disconnectEverything() { + ArrayList<Deferred<Void>> deferreds = new ArrayList<>(2); + HashMap<String, TabletClient> ip2client_copy; + synchronized (ip2client) { + // Make a local copy so we can shutdown every Tablet Server clients + // without hold the lock while we iterate over the data structure. + ip2client_copy = new HashMap<>(ip2client); + } + + for (TabletClient ts : ip2client_copy.values()) { + deferreds.add(ts.shutdown()); + } + final int size = deferreds.size(); + return Deferred.group(deferreds).addCallback( + new Callback<ArrayList<Void>, ArrayList<Void>>() { + public ArrayList<Void> call(final ArrayList<Void> arg) { + // Normally, now that we've shutdown() every client, all our caches should + // be empty since each shutdown() generates a DISCONNECTED event, which + // causes TabletClientPipeline to call removeClientFromIpCache(). + HashMap<String, TabletClient> logme = null; + synchronized (ip2client) { + if (!ip2client.isEmpty()) { + logme = new HashMap<>(ip2client); + } + } + if (logme != null) { + // Putting this logging statement inside the synchronized block + // can lead to a deadlock, since HashMap.toString() is going to + // call TabletClient.toString() on each entry, and this locks the + // client briefly. Other parts of the code lock clients first and + // the ip2client HashMap second, so this can easily deadlock. + LOG.error("Some clients are left in the client cache and haven't" + + " been cleaned up: " + logme); + } + return arg; + } + + public String toString() { + return "wait " + size + " TabletClient.shutdown()"; + } + }); + } + + /** + * Modifying the list returned by this method won't affect this cache, + * but calling certain methods on the returned TabletClients can. For example, + * it's possible to forcefully shutdown a connection to a tablet server by calling {@link + * TabletClient#shutdown()}. + * @return copy of the current TabletClients list + */ + List<TabletClient> getTabletClients() { + synchronized (ip2client) { + return new ArrayList<>(ip2client.values()); + } + } + + /** + * Blocking call. Performs a slow search of the IP used by the given client. + * <p> + * This is needed when we're trying to find the IP of the client before its + * channel has successfully connected, because Netty's API offers no way of + * retrieving the IP of the remote peer until we're connected to it. + * @param client the client we want the IP of + * @return the IP of the client, or {@code null} if we couldn't find it + */ + private InetSocketAddress slowSearchClientIP(final TabletClient client) { + String hostport = null; + synchronized (ip2client) { + for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) { + if (e.getValue() == client) { + hostport = e.getKey(); + break; + } + } + } + + if (hostport == null) { + HashMap<String, TabletClient> copy; + synchronized (ip2client) { + copy = new HashMap<>(ip2client); + } + LOG.error("Couldn't find {} in {}", client, copy); + return null; + } + final int colon = hostport.indexOf(':', 1); + if (colon < 1) { + LOG.error("No `:' found in {}", hostport); + return null; + } + final String host = getIP(hostport.substring(0, colon)); + if (host == null) { + // getIP will print the reason why, there's nothing else we can do. + return null; + } + + int port; + try { + port = parsePortNumber(hostport.substring(colon + 1, + hostport.length())); + } catch (NumberFormatException e) { + LOG.error("Bad port in {}", hostport, e); + return null; + } + return new InetSocketAddress(host, port); + } + + /** + * Removes the given client from the `ip2client` cache. + * @param client the client for which we must clear the ip cache + * @param remote the address of the remote peer, if known, or null + */ + private void removeClientFromIpCache(final TabletClient client, + final SocketAddress remote) { + + if (remote == null) { + return; // Can't continue without knowing the remote address. + } + + String hostport; + if (remote instanceof InetSocketAddress) { + final InetSocketAddress sock = (InetSocketAddress) remote; + final InetAddress addr = sock.getAddress(); + if (addr == null) { + LOG.error("Unresolved IP for {}. This shouldn't happen.", remote); + return; + } else { + hostport = addr.getHostAddress() + ':' + sock.getPort(); + } + } else { + LOG.error("Found a non-InetSocketAddress remote: {}. This shouldn't happen.", remote); + return; + } + + TabletClient old; + synchronized (ip2client) { + old = ip2client.remove(hostport); + } + + LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}"); + if (old == null) { + LOG.trace("When expiring {} from the client cache (host:port={}" + + "), it was found that there was no entry" + + " corresponding to {}. This shouldn't happen.", client, hostport, remote); + } + } + + /** + * Gets a hostname or an IP address and returns the textual representation + * of the IP address. + * <p> + * <strong>This method can block</strong> as there is no API for + * asynchronous DNS resolution in the JDK. + * @param host the hostname to resolve + * @return the IP address associated with the given hostname, + * or {@code null} if the address couldn't be resolved + */ + static String getIP(final String host) { + final long start = System.nanoTime(); + try { + final String ip = InetAddress.getByName(host).getHostAddress(); + final long latency = System.nanoTime() - start; + if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) { + LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency); + } else if (latency >= 3000000/*ns*/) { + LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency); + } + return ip; + } catch (UnknownHostException e) { + LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start)); + return null; + } + } + + /** + * Parses a TCP port number from a string. + * @param portnum the string to parse + * @return a strictly positive, validated port number + * @throws NumberFormatException if the string couldn't be parsed as an + * integer or if the value was outside of the range allowed for TCP ports + */ + private static int parsePortNumber(final String portnum) + throws NumberFormatException { + final int port = Integer.parseInt(portnum); + if (port <= 0 || port > 65535) { + throw new NumberFormatException(port == 0 ? "port is zero" : + (port < 0 ? "port is negative: " + : "port is too large: ") + port); + } + return port; + } + + private final class TabletClientPipeline extends DefaultChannelPipeline { + + private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class); + /** + * Have we already disconnected?. + * We use this to avoid doing the cleanup work for the same client more + * than once, even if we get multiple events indicating that the client + * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED). + * No synchronization needed as this is always accessed from only one + * thread at a time (equivalent to a non-shared state in a Netty handler). + */ + private boolean disconnected = false; + + TabletClient init(String uuid, String host, int port) { + AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; + final TabletClient client = new TabletClient(kuduClient, uuid, host, port); + if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { + super.addLast("timeout-handler", + new ReadTimeoutHandler(kuduClient.getTimer(), + kuduClient.getDefaultSocketReadTimeoutMs(), + TimeUnit.MILLISECONDS)); + } + super.addLast("kudu-handler", client); + + return client; + } + + @Override + public void sendDownstream(final ChannelEvent event) { + if (event instanceof ChannelStateEvent) { + handleDisconnect((ChannelStateEvent) event); + } + super.sendDownstream(event); + } + + @Override + public void sendUpstream(final ChannelEvent event) { + if (event instanceof ChannelStateEvent) { + handleDisconnect((ChannelStateEvent) event); + } + super.sendUpstream(event); + } + + private void handleDisconnect(final ChannelStateEvent state_event) { + if (disconnected) { + return; + } + switch (state_event.getState()) { + case OPEN: + if (state_event.getValue() == Boolean.FALSE) { + break; // CLOSED + } + return; + case CONNECTED: + if (state_event.getValue() == null) { + break; // DISCONNECTED + } + return; + default: + return; // Not an event we're interested in, ignore it. + } + + disconnected = true; // So we don't clean up the same client twice. + try { + final TabletClient client = super.get(TabletClient.class); + SocketAddress remote = super.getChannel().getRemoteAddress(); + // At this point Netty gives us no easy way to access the + // SocketAddress of the peer we tried to connect to. This + // kinda sucks but I couldn't find an easier way. + if (remote == null) { + remote = slowSearchClientIP(client); + } + + synchronized (client) { + removeClientFromIpCache(client, remote); + } + } catch (Exception e) { + log.error("Uncaught exception when handling a disconnection of " + getChannel(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java index 2de16fd..2c191ff 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java @@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest { session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join(); - int numClientsBefore = client.ip2client.size(); + int numClientsBefore = client.getTabletClients().size(); // Restart all the tablet servers. killTabletServers(); @@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest { session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join(); // We should not have leaked an entry in the client2tablets map. - int numClientsAfter = client.ip2client.size(); + int numClientsAfter = client.getTabletClients().size(); assertEquals(numClientsBefore, numClientsAfter); } finally { restartTabletServers();