Repository: kudu
Updated Branches:
  refs/heads/master 0c52f161b -> 0c44223ea


[java client] Extract ip2client out of AsyncKuduClient

As part of making the Java client more modular and easier to test, this patch 
is moving
almost all of the connections management into a separate class. The change was 
rather
painless.

Change-Id: I48b0f3f262abd5ee26869202f661b3c25158f39c
Reviewed-on: http://gerrit.cloudera.org:8080/4717
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <a...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e5b7ebf8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e5b7ebf8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e5b7ebf8

Branch: refs/heads/master
Commit: e5b7ebf8daa4b17a475814e62ed87d9dff4f6095
Parents: 0c52f16
Author: Jean-Daniel Cryans <jdcry...@apache.org>
Authored: Thu Oct 13 12:36:23 2016 -0700
Committer: Jean-Daniel Cryans <jdcry...@apache.org>
Committed: Fri Oct 14 21:38:26 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 373 +----------------
 .../org/apache/kudu/client/ConnectionCache.java | 395 +++++++++++++++++++
 .../kudu/client/TestAsyncKuduSession.java       |   4 +-
 3 files changed, 417 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cbb128e..9b5ba5b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -37,9 +37,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.annotations.InterfaceAudience;
@@ -51,14 +48,10 @@ import org.apache.kudu.util.AsyncUtil;
 import org.apache.kudu.util.NetUtil;
 import org.apache.kudu.util.Pair;
 import org.apache.kudu.util.Slice;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timeout;
 import org.jboss.netty.util.TimerTask;
@@ -66,19 +59,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -86,7 +73,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -148,33 +134,7 @@ public class AsyncKuduClient implements AutoCloseable {
   private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
       new ConcurrentHashMap<>();
 
-  /**
-   * Cache that maps a TabletServer address ("ip:port") to the clients
-   * connected to it.
-   * <p>
-   * Access to this map must be synchronized by locking its monitor.
-   * Logging the contents of this map (or calling toString) requires copying 
it first.
-   * <p>
-   * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
-   * (just when connecting to / disconnecting from TabletClients) and when we
-   * add something to it, we want to do an atomic get-and-put, but
-   * {@code putIfAbsent} isn't a good fit for us since it requires to create
-   * an object that may be "wasted" in case another thread wins the insertion
-   * race, and we don't want to create unnecessary connections.
-   * <p>
-   * Upon disconnection, clients are automatically removed from this map.
-   * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
-   * the clean-up on the {@code channelClosed} event, which is actually the
-   * 3rd and last event to be fired when a channel gets disconnected.  The
-   * first one to get fired is, {@code channelDisconnected}.  This matters to
-   * us because we want to purge disconnected clients from the cache as
-   * quickly as possible after the disconnection, to avoid handing out clients
-   * that are going to cause unnecessary errors.
-   * @see TabletClientPipeline#handleDisconnect
-   */
-  @VisibleForTesting
-  @GuardedBy("ip2client")
-  final HashMap<String, TabletClient> ip2client = new HashMap<>();
+  private final ConnectionCache connectionCache;
 
   @GuardedBy("sessions")
   private final Set<AsyncKuduSession> sessions = new HashSet<>();
@@ -241,6 +201,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     String clientId = UUID.randomUUID().toString().replace("-", "");
     this.requestTracker = new RequestTracker(clientId);
+    this.connectionCache = new ConnectionCache(this);
   }
 
   /**
@@ -579,6 +540,14 @@ public class AsyncKuduClient implements AutoCloseable {
     return requestTracker;
   }
 
+  HashedWheelTimer getTimer() {
+    return timer;
+  }
+
+  ClientSocketChannelFactory getChannelFactory() {
+    return channelFactory;
+  }
+
   /**
    * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a 
particular table.
    * @param table the name of the table you intend to scan.
@@ -958,13 +927,11 @@ public class AsyncKuduClient implements AutoCloseable {
    * but calling certain methods on the returned TabletClients can. For 
example,
    * it's possible to forcefully shutdown a connection to a tablet server by 
calling {@link
    * TabletClient#shutdown()}.
-   * @return Copy of the current TabletClients list
+   * @return copy of the current TabletClients list
    */
   @VisibleForTesting
   List<TabletClient> getTabletClients() {
-    synchronized (ip2client) {
-      return new ArrayList<TabletClient>(ip2client.values());
-    }
+    return connectionCache.getTabletClients();
   }
 
   /**
@@ -1476,7 +1443,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return A live and initialized client for the specified master server.
    */
   TabletClient newMasterClient(HostAndPort masterHostPort) {
-    String ip = getIP(masterHostPort.getHostText());
+    String ip = ConnectionCache.getIP(masterHostPort.getHostText());
     if (ip == null) {
       return null;
     }
@@ -1484,36 +1451,11 @@ public class AsyncKuduClient implements AutoCloseable {
     // communicate with the masters to find out about them, and that's what 
we're trying to do.
     // The UUID is used for logging, so instead we're passing the "master 
table name" followed by
     // host and port which is enough to identify the node we're connecting to.
-    return newClient(MASTER_TABLE_NAME_PLACEHOLDER + " - " + 
masterHostPort.toString(),
+    return connectionCache.newClient(
+        MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
         ip, masterHostPort.getPort());
   }
 
-  TabletClient newClient(String uuid, final String host, final int port) {
-    final String hostport = host + ':' + port;
-    TabletClient client;
-    SocketChannel chan;
-    synchronized (ip2client) {
-      client = ip2client.get(hostport);
-      if (client != null && client.isAlive()) {
-        return client;
-      }
-      final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(uuid, host, port);
-      chan = channelFactory.newChannel(pipeline);
-      TabletClient oldClient = ip2client.put(hostport, client);
-      assert oldClient == null;
-    }
-    final SocketChannelConfig config = chan.getConfig();
-    config.setConnectTimeoutMillis(5000);
-    config.setTcpNoDelay(true);
-    // Unfortunately there is no way to override the keep-alive timeout in
-    // Java since the JRE doesn't expose any way to call setsockopt() with
-    // TCP_KEEPIDLE.  And of course the default timeout is >2h. Sigh.
-    config.setKeepAlive(true);
-    chan.connect(new InetSocketAddress(host, port));  // Won't block.
-    return client;
-  }
-
   /**
    * Invokes {@link #shutdown()} and waits for the configured admin timeout. 
This method returns
    * void, so consider invoking shutdown directly if there's a need to handle 
dangling RPCs.
@@ -1577,7 +1519,7 @@ public class AsyncKuduClient implements AutoCloseable {
     final class DisconnectCB implements Callback<Deferred<ArrayList<Void>>,
         ArrayList<List<OperationResponse>>> {
       public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> 
ignoredResponses) {
-        return disconnectEverything().addCallback(new ReleaseResourcesCB());
+        return connectionCache.disconnectEverything().addCallback(new 
ReleaseResourcesCB());
       }
       public String toString() {
         return "disconnect callback";
@@ -1614,287 +1556,12 @@ public class AsyncKuduClient implements AutoCloseable {
     return Deferred.group(deferreds);
   }
 
-  /**
-   * Closes every socket, which will also cancel all the RPCs in flight.
-   */
-  private Deferred<ArrayList<Void>> disconnectEverything() {
-    ArrayList<Deferred<Void>> deferreds =
-        new ArrayList<Deferred<Void>>(2);
-    HashMap<String, TabletClient> ip2client_copy;
-    synchronized (ip2client) {
-      // Make a local copy so we can shutdown every Tablet Server clients
-      // without hold the lock while we iterate over the data structure.
-      ip2client_copy = new HashMap<String, TabletClient>(ip2client);
-    }
-
-    for (TabletClient ts : ip2client_copy.values()) {
-      deferreds.add(ts.shutdown());
-    }
-    final int size = deferreds.size();
-    return Deferred.group(deferreds).addCallback(
-        new Callback<ArrayList<Void>, ArrayList<Void>>() {
-          public ArrayList<Void> call(final ArrayList<Void> arg) {
-            // Normally, now that we've shutdown() every client, all our 
caches should
-            // be empty since each shutdown() generates a DISCONNECTED event, 
which
-            // causes TabletClientPipeline to call removeClientFromIpCache().
-            HashMap<String, TabletClient> logme = null;
-            synchronized (ip2client) {
-              if (!ip2client.isEmpty()) {
-                logme = new HashMap<String, TabletClient>(ip2client);
-              }
-            }
-            if (logme != null) {
-              // Putting this logging statement inside the synchronized block
-              // can lead to a deadlock, since HashMap.toString() is going to
-              // call TabletClient.toString() on each entry, and this locks the
-              // client briefly.  Other parts of the code lock clients first 
and
-              // the ip2client HashMap second, so this can easily deadlock.
-              LOG.error("Some clients are left in the client cache and haven't"
-                  + " been cleaned up: " + logme);
-            }
-            return arg;
-          }
-
-          public String toString() {
-            return "wait " + size + " TabletClient.shutdown()";
-          }
-        });
-  }
-
-  /**
-   * Blocking call.
-   * Performs a slow search of the IP used by the given client.
-   * <p>
-   * This is needed when we're trying to find the IP of the client before its
-   * channel has successfully connected, because Netty's API offers no way of
-   * retrieving the IP of the remote peer until we're connected to it.
-   * @param client The client we want the IP of.
-   * @return The IP of the client, or {@code null} if we couldn't find it.
-   */
-  private InetSocketAddress slowSearchClientIP(final TabletClient client) {
-    String hostport = null;
-    synchronized (ip2client) {
-      for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
-        if (e.getValue() == client) {
-          hostport = e.getKey();
-          break;
-        }
-      }
-    }
-
-    if (hostport == null) {
-      HashMap<String, TabletClient> copy;
-      synchronized (ip2client) {
-        copy = new HashMap<String, TabletClient>(ip2client);
-      }
-      LOG.error("WTF?  Should never happen!  Couldn't find " + client
-          + " in " + copy);
-      return null;
-    }
-    final int colon = hostport.indexOf(':', 1);
-    if (colon < 1) {
-      LOG.error("WTF?  Should never happen!  No `:' found in " + hostport);
-      return null;
-    }
-    final String host = getIP(hostport.substring(0, colon));
-    if (host == null) {
-      // getIP will print the reason why, there's nothing else we can do.
-      return null;
-    }
-
-    int port;
-    try {
-      port = parsePortNumber(hostport.substring(colon + 1,
-          hostport.length()));
-    } catch (NumberFormatException e) {
-      LOG.error("WTF?  Should never happen!  Bad port in " + hostport, e);
-      return null;
-    }
-    return new InetSocketAddress(host, port);
-  }
-
-  /**
-   * Removes the given client from the `ip2client` cache.
-   * @param client The client for which we must clear the ip cache
-   * @param remote The address of the remote peer, if known, or null
-   */
-  private void removeClientFromIpCache(final TabletClient client,
-                                       final SocketAddress remote) {
-
-    if (remote == null) {
-      return;  // Can't continue without knowing the remote address.
-    }
-
-    String hostport;
-    if (remote instanceof InetSocketAddress) {
-      final InetSocketAddress sock = (InetSocketAddress) remote;
-      final InetAddress addr = sock.getAddress();
-      if (addr == null) {
-        LOG.error("WTF?  Unresolved IP for " + remote
-            + ".  This shouldn't happen.");
-        return;
-      } else {
-        hostport = addr.getHostAddress() + ':' + sock.getPort();
-      }
-    } else {
-      LOG.error("WTF?  Found a non-InetSocketAddress remote: " + remote
-          + ".  This shouldn't happen.");
-      return;
-    }
-
-    TabletClient old;
-    synchronized (ip2client) {
-      old = ip2client.remove(hostport);
-    }
-
-    LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
-    if (old == null) {
-      // Currently we're seeing this message when masters are disconnected and 
the hostport we got
-      // above is different than the one the user passes (that we use to 
populate ip2client). At
-      // worst this doubles the entries for masters, which has an 
insignificant impact.
-      // TODO When fixed, make this a WARN again.
-      LOG.trace("When expiring " + client + " from the client cache 
(host:port="
-          + hostport + "), it was found that there was no entry"
-          + " corresponding to " + remote + ".  This shouldn't happen.");
-    }
-  }
-
   private boolean isMasterTable(String tableId) {
     // Checking that it's the same instance so there's absolutely no chance of 
confusing the master
     // 'table' for a user one.
     return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
   }
 
-  private final class TabletClientPipeline extends DefaultChannelPipeline {
-
-    private final Logger log = 
LoggerFactory.getLogger(TabletClientPipeline.class);
-    /**
-     * Have we already disconnected?.
-     * We use this to avoid doing the cleanup work for the same client more
-     * than once, even if we get multiple events indicating that the client
-     * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
-     * No synchronization needed as this is always accessed from only one
-     * thread at a time (equivalent to a non-shared state in a Netty handler).
-     */
-    private boolean disconnected = false;
-
-    TabletClient init(String uuid, String host, int port) {
-      final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, 
host, port);
-      if (defaultSocketReadTimeoutMs > 0) {
-        super.addLast("timeout-handler",
-            new ReadTimeoutHandler(timer,
-                defaultSocketReadTimeoutMs,
-                TimeUnit.MILLISECONDS));
-      }
-      super.addLast("kudu-handler", client);
-
-      return client;
-    }
-
-    @Override
-    public void sendDownstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendDownstream(event);
-    }
-
-    @Override
-    public void sendUpstream(final ChannelEvent event) {
-      if (event instanceof ChannelStateEvent) {
-        handleDisconnect((ChannelStateEvent) event);
-      }
-      super.sendUpstream(event);
-    }
-
-    private void handleDisconnect(final ChannelStateEvent state_event) {
-      if (disconnected) {
-        return;
-      }
-      switch (state_event.getState()) {
-        case OPEN:
-          if (state_event.getValue() == Boolean.FALSE) {
-            break;  // CLOSED
-          }
-          return;
-        case CONNECTED:
-          if (state_event.getValue() == null) {
-            break;  // DISCONNECTED
-          }
-          return;
-        default:
-          return;  // Not an event we're interested in, ignore it.
-      }
-
-      disconnected = true;  // So we don't clean up the same client twice.
-      try {
-        final TabletClient client = super.get(TabletClient.class);
-        SocketAddress remote = super.getChannel().getRemoteAddress();
-        // At this point Netty gives us no easy way to access the
-        // SocketAddress of the peer we tried to connect to. This
-        // kinda sucks but I couldn't find an easier way.
-        if (remote == null) {
-          remote = slowSearchClientIP(client);
-        }
-
-        synchronized (client) {
-          removeClientFromIpCache(client, remote);
-        }
-      } catch (Exception e) {
-        log.error("Uncaught exception when handling a disconnection of " + 
getChannel(), e);
-      }
-    }
-
-  }
-
-  /**
-   * Gets a hostname or an IP address and returns the textual representation
-   * of the IP address.
-   * <p>
-   * <strong>This method can block</strong> as there is no API for
-   * asynchronous DNS resolution in the JDK.
-   * @param host The hostname to resolve.
-   * @return The IP address associated with the given hostname,
-   * or {@code null} if the address couldn't be resolved.
-   */
-  private static String getIP(final String host) {
-    final long start = System.nanoTime();
-    try {
-      final String ip = InetAddress.getByName(host).getHostAddress();
-      final long latency = System.nanoTime() - start;
-      if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
-        LOG.debug("Resolved IP of `" + host + "' to "
-            + ip + " in " + latency + "ns");
-      } else if (latency >= 3000000/*ns*/) {
-        LOG.warn("Slow DNS lookup!  Resolved IP of `" + host + "' to "
-            + ip + " in " + latency + "ns");
-      }
-      return ip;
-    } catch (UnknownHostException e) {
-      LOG.error("Failed to resolve the IP of `" + host + "' in "
-          + (System.nanoTime() - start) + "ns");
-      return null;
-    }
-  }
-
-  /**
-   * Parses a TCP port number from a string.
-   * @param portnum The string to parse.
-   * @return A strictly positive, validated port number.
-   * @throws NumberFormatException if the string couldn't be parsed as an
-   * integer or if the value was outside of the range allowed for TCP ports.
-   */
-  private static int parsePortNumber(final String portnum)
-      throws NumberFormatException {
-    final int port = Integer.parseInt(portnum);
-    if (port <= 0 || port > 65535) {
-      throw new NumberFormatException(port == 0 ? "port is zero" :
-          (port < 0 ? "port is negative: "
-              : "port is too large: ") + port);
-    }
-    return port;
-  }
-
   void newTimeout(final TimerTask task, final long timeout_ms) {
     try {
       timer.newTimeout(task, timeout_ms, MILLISECONDS);
@@ -2005,11 +1672,11 @@ public class AsyncKuduClient implements AutoCloseable {
     @GuardedBy("tabletServers")
     private void addTabletClient(String uuid, String host, int port, boolean 
isLeader)
         throws UnknownHostException {
-      String ip = getIP(host);
+      String ip = ConnectionCache.getIP(host);
       if (ip == null) {
         throw new UnknownHostException("Failed to resolve the IP of `" + host 
+ "'");
       }
-      TabletClient client = newClient(uuid, ip, port);
+      TabletClient client = connectionCache.newClient(uuid, ip, port);
 
       tabletServers.add(client);
       if (isLeader) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
new file mode 100644
index 0000000..f421d32
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The ConnectionCache is responsible for managing connections to masters and 
tablets. There
+ * should only be one instance per Kudu client, and can <strong>not</strong> 
be shared between
+ * clients.
+ * <p>
+ * This class is thread-safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class ConnectionCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionCache.class);
+
+  /**
+   * Cache that maps a TabletServer address ("ip:port") to the clients
+   * connected to it.
+   * <p>
+   * Access to this map must be synchronized by locking its monitor.
+   * Logging the contents of this map (or calling toString) requires copying 
it first.
+   * <p>
+   * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
+   * (just when connecting to / disconnecting from TabletClients) and when we
+   * add something to it, we want to do an atomic get-and-put, but
+   * {@code putIfAbsent} isn't a good fit for us since it requires to create
+   * an object that may be "wasted" in case another thread wins the insertion
+   * race, and we don't want to create unnecessary connections.
+   * <p>
+   * Upon disconnection, clients are automatically removed from this map.
+   * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
+   * the clean-up on the {@code channelClosed} event, which is actually the
+   * 3rd and last event to be fired when a channel gets disconnected. The
+   * first one to get fired is, {@code channelDisconnected}. This matters to
+   * us because we want to purge disconnected clients from the cache as
+   * quickly as possible after the disconnection, to avoid handing out clients
+   * that are going to cause unnecessary errors.
+   * @see TabletClientPipeline#handleDisconnect
+   */
+  @GuardedBy("ip2client")
+  private final HashMap<String, TabletClient> ip2client = new HashMap<>();
+
+  private final AsyncKuduClient kuduClient;
+
+  /**
+   * Create a new empty ConnectionCache that will used the passed client to 
create connections.
+   * @param client a client that contains the information we need to create 
connections
+   */
+  ConnectionCache(AsyncKuduClient client) {
+    this.kuduClient = client;
+  }
+
+  TabletClient newClient(String uuid, final String host, final int port) {
+    final String hostport = host + ':' + port;
+    TabletClient client;
+    SocketChannel chan;
+    synchronized (ip2client) {
+      client = ip2client.get(hostport);
+      if (client != null && client.isAlive()) {
+        return client;
+      }
+      final TabletClientPipeline pipeline = new TabletClientPipeline();
+      client = pipeline.init(uuid, host, port);
+      chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
+      TabletClient oldClient = ip2client.put(hostport, client);
+      assert oldClient == null;
+    }
+    final SocketChannelConfig config = chan.getConfig();
+    config.setConnectTimeoutMillis(5000);
+    config.setTcpNoDelay(true);
+    // Unfortunately there is no way to override the keep-alive timeout in
+    // Java since the JRE doesn't expose any way to call setsockopt() with
+    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+    config.setKeepAlive(true);
+    chan.connect(new InetSocketAddress(host, port));  // Won't block.
+    return client;
+  }
+
+  /**
+   * Closes every socket, which will also cancel all the RPCs in flight.
+   */
+  Deferred<ArrayList<Void>> disconnectEverything() {
+    ArrayList<Deferred<Void>> deferreds = new ArrayList<>(2);
+    HashMap<String, TabletClient> ip2client_copy;
+    synchronized (ip2client) {
+      // Make a local copy so we can shutdown every Tablet Server clients
+      // without hold the lock while we iterate over the data structure.
+      ip2client_copy = new HashMap<>(ip2client);
+    }
+
+    for (TabletClient ts : ip2client_copy.values()) {
+      deferreds.add(ts.shutdown());
+    }
+    final int size = deferreds.size();
+    return Deferred.group(deferreds).addCallback(
+        new Callback<ArrayList<Void>, ArrayList<Void>>() {
+          public ArrayList<Void> call(final ArrayList<Void> arg) {
+            // Normally, now that we've shutdown() every client, all our 
caches should
+            // be empty since each shutdown() generates a DISCONNECTED event, 
which
+            // causes TabletClientPipeline to call removeClientFromIpCache().
+            HashMap<String, TabletClient> logme = null;
+            synchronized (ip2client) {
+              if (!ip2client.isEmpty()) {
+                logme = new HashMap<>(ip2client);
+              }
+            }
+            if (logme != null) {
+              // Putting this logging statement inside the synchronized block
+              // can lead to a deadlock, since HashMap.toString() is going to
+              // call TabletClient.toString() on each entry, and this locks the
+              // client briefly. Other parts of the code lock clients first and
+              // the ip2client HashMap second, so this can easily deadlock.
+              LOG.error("Some clients are left in the client cache and haven't"
+                  + " been cleaned up: " + logme);
+            }
+            return arg;
+          }
+
+          public String toString() {
+            return "wait " + size + " TabletClient.shutdown()";
+          }
+        });
+  }
+
+  /**
+   * Modifying the list returned by this method won't affect this cache,
+   * but calling certain methods on the returned TabletClients can. For 
example,
+   * it's possible to forcefully shutdown a connection to a tablet server by 
calling {@link
+   * TabletClient#shutdown()}.
+   * @return copy of the current TabletClients list
+   */
+  List<TabletClient> getTabletClients() {
+    synchronized (ip2client) {
+      return new ArrayList<>(ip2client.values());
+    }
+  }
+
+  /**
+   * Blocking call. Performs a slow search of the IP used by the given client.
+   * <p>
+   * This is needed when we're trying to find the IP of the client before its
+   * channel has successfully connected, because Netty's API offers no way of
+   * retrieving the IP of the remote peer until we're connected to it.
+   * @param client the client we want the IP of
+   * @return the IP of the client, or {@code null} if we couldn't find it
+   */
+  private InetSocketAddress slowSearchClientIP(final TabletClient client) {
+    String hostport = null;
+    synchronized (ip2client) {
+      for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
+        if (e.getValue() == client) {
+          hostport = e.getKey();
+          break;
+        }
+      }
+    }
+
+    if (hostport == null) {
+      HashMap<String, TabletClient> copy;
+      synchronized (ip2client) {
+        copy = new HashMap<>(ip2client);
+      }
+      LOG.error("Couldn't find {} in {}", client, copy);
+      return null;
+    }
+    final int colon = hostport.indexOf(':', 1);
+    if (colon < 1) {
+      LOG.error("No `:' found in {}", hostport);
+      return null;
+    }
+    final String host = getIP(hostport.substring(0, colon));
+    if (host == null) {
+      // getIP will print the reason why, there's nothing else we can do.
+      return null;
+    }
+
+    int port;
+    try {
+      port = parsePortNumber(hostport.substring(colon + 1,
+          hostport.length()));
+    } catch (NumberFormatException e) {
+      LOG.error("Bad port in {}", hostport, e);
+      return null;
+    }
+    return new InetSocketAddress(host, port);
+  }
+
+  /**
+   * Removes the given client from the `ip2client` cache.
+   * @param client the client for which we must clear the ip cache
+   * @param remote the address of the remote peer, if known, or null
+   */
+  private void removeClientFromIpCache(final TabletClient client,
+                                       final SocketAddress remote) {
+
+    if (remote == null) {
+      return;  // Can't continue without knowing the remote address.
+    }
+
+    String hostport;
+    if (remote instanceof InetSocketAddress) {
+      final InetSocketAddress sock = (InetSocketAddress) remote;
+      final InetAddress addr = sock.getAddress();
+      if (addr == null) {
+        LOG.error("Unresolved IP for {}. This shouldn't happen.", remote);
+        return;
+      } else {
+        hostport = addr.getHostAddress() + ':' + sock.getPort();
+      }
+    } else {
+      LOG.error("Found a non-InetSocketAddress remote: {}. This shouldn't 
happen.", remote);
+      return;
+    }
+
+    TabletClient old;
+    synchronized (ip2client) {
+      old = ip2client.remove(hostport);
+    }
+
+    LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
+    if (old == null) {
+      LOG.trace("When expiring {} from the client cache (host:port={}"
+          + "), it was found that there was no entry"
+          + " corresponding to {}. This shouldn't happen.", client, hostport, 
remote);
+    }
+  }
+
+  /**
+   * Gets a hostname or an IP address and returns the textual representation
+   * of the IP address.
+   * <p>
+   * <strong>This method can block</strong> as there is no API for
+   * asynchronous DNS resolution in the JDK.
+   * @param host the hostname to resolve
+   * @return the IP address associated with the given hostname,
+   * or {@code null} if the address couldn't be resolved
+   */
+  static String getIP(final String host) {
+    final long start = System.nanoTime();
+    try {
+      final String ip = InetAddress.getByName(host).getHostAddress();
+      final long latency = System.nanoTime() - start;
+      if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
+        LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+      } else if (latency >= 3000000/*ns*/) {
+        LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, 
ip, latency);
+      }
+      return ip;
+    } catch (UnknownHostException e) {
+      LOG.error("Failed to resolve the IP of `{}' in {}ns", host, 
(System.nanoTime() - start));
+      return null;
+    }
+  }
+
+  /**
+   * Parses a TCP port number from a string.
+   * @param portnum the string to parse
+   * @return a strictly positive, validated port number
+   * @throws NumberFormatException if the string couldn't be parsed as an
+   * integer or if the value was outside of the range allowed for TCP ports
+   */
+  private static int parsePortNumber(final String portnum)
+      throws NumberFormatException {
+    final int port = Integer.parseInt(portnum);
+    if (port <= 0 || port > 65535) {
+      throw new NumberFormatException(port == 0 ? "port is zero" :
+          (port < 0 ? "port is negative: "
+              : "port is too large: ") + port);
+    }
+    return port;
+  }
+
+  private final class TabletClientPipeline extends DefaultChannelPipeline {
+
+    private final Logger log = 
LoggerFactory.getLogger(TabletClientPipeline.class);
+    /**
+     * Have we already disconnected?.
+     * We use this to avoid doing the cleanup work for the same client more
+     * than once, even if we get multiple events indicating that the client
+     * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
+     * No synchronization needed as this is always accessed from only one
+     * thread at a time (equivalent to a non-shared state in a Netty handler).
+     */
+    private boolean disconnected = false;
+
+    TabletClient init(String uuid, String host, int port) {
+      AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
+      final TabletClient client = new TabletClient(kuduClient, uuid, host, 
port);
+      if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
+        super.addLast("timeout-handler",
+            new ReadTimeoutHandler(kuduClient.getTimer(),
+                kuduClient.getDefaultSocketReadTimeoutMs(),
+                TimeUnit.MILLISECONDS));
+      }
+      super.addLast("kudu-handler", client);
+
+      return client;
+    }
+
+    @Override
+    public void sendDownstream(final ChannelEvent event) {
+      if (event instanceof ChannelStateEvent) {
+        handleDisconnect((ChannelStateEvent) event);
+      }
+      super.sendDownstream(event);
+    }
+
+    @Override
+    public void sendUpstream(final ChannelEvent event) {
+      if (event instanceof ChannelStateEvent) {
+        handleDisconnect((ChannelStateEvent) event);
+      }
+      super.sendUpstream(event);
+    }
+
+    private void handleDisconnect(final ChannelStateEvent state_event) {
+      if (disconnected) {
+        return;
+      }
+      switch (state_event.getState()) {
+        case OPEN:
+          if (state_event.getValue() == Boolean.FALSE) {
+            break;  // CLOSED
+          }
+          return;
+        case CONNECTED:
+          if (state_event.getValue() == null) {
+            break;  // DISCONNECTED
+          }
+          return;
+        default:
+          return;  // Not an event we're interested in, ignore it.
+      }
+
+      disconnected = true;  // So we don't clean up the same client twice.
+      try {
+        final TabletClient client = super.get(TabletClient.class);
+        SocketAddress remote = super.getChannel().getRemoteAddress();
+        // At this point Netty gives us no easy way to access the
+        // SocketAddress of the peer we tried to connect to. This
+        // kinda sucks but I couldn't find an easier way.
+        if (remote == null) {
+          remote = slowSearchClientIP(client);
+        }
+
+        synchronized (client) {
+          removeClientFromIpCache(client, remote);
+        }
+      } catch (Exception e) {
+        log.error("Uncaught exception when handling a disconnection of " + 
getChannel(), e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 2de16fd..2c191ff 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
 
-      int numClientsBefore = client.ip2client.size();
+      int numClientsBefore = client.getTabletClients().size();
 
       // Restart all the tablet servers.
       killTabletServers();
@@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
 
       // We should not have leaked an entry in the client2tablets map.
-      int numClientsAfter = client.ip2client.size();
+      int numClientsAfter = client.getTabletClients().size();
       assertEquals(numClientsBefore, numClientsAfter);
     } finally {
       restartTabletServers();

Reply via email to