HDFS-13232. RBF: ConnectionManager's cleanup task will compare each pool's own 
active conns with its total conns. Contributed by Chao Sun.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c2b969e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c2b969e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c2b969e

Branch: refs/heads/HDFS-12996
Commit: 0c2b969e0161a068bf9ae013c4b95508dfb90a8a
Parents: 7ef4d94
Author: Inigo Goiri <inigo...@apache.org>
Authored: Thu Mar 8 09:32:05 2018 -0800
Committer: Inigo Goiri <inigo...@apache.org>
Committed: Thu Mar 8 09:32:05 2018 -0800

----------------------------------------------------------------------
 .../federation/router/ConnectionManager.java    |  59 +++++-----
 .../federation/router/ConnectionPoolId.java     |   6 +
 .../router/TestConnectionManager.java           | 114 +++++++++++++++++++
 3 files changed, 153 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c2b969e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index 2e45280..594f489 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -303,6 +304,38 @@ public class ConnectionManager {
     return JSON.toString(info);
   }
 
+  @VisibleForTesting
+  Map<ConnectionPoolId, ConnectionPool> getPools() {
+    return this.pools;
+  }
+
+  /**
+   * Clean the unused connections for this pool.
+   *
+   * @param pool Connection pool to cleanup.
+   */
+  @VisibleForTesting
+  void cleanup(ConnectionPool pool) {
+    if (pool.getNumConnections() > pool.getMinSize()) {
+      // Check if the pool hasn't been active in a while or not 50% are used
+      long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
+      int total = pool.getNumConnections();
+      int active = pool.getNumActiveConnections();
+      if (timeSinceLastActive > connectionCleanupPeriodMs ||
+          active < MIN_ACTIVE_RATIO * total) {
+        // Remove and close 1 connection
+        List<ConnectionContext> conns = pool.removeConnections(1);
+        for (ConnectionContext conn : conns) {
+          conn.close();
+        }
+        LOG.debug("Removed connection {} used {} seconds ago. " +
+                "Pool has {}/{} connections", pool.getConnectionPoolId(),
+            TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
+            pool.getNumConnections(), pool.getMaxSize());
+      }
+    }
+  }
+
   /**
    * Removes stale connections not accessed recently from the pool. This is
    * invoked periodically.
@@ -350,32 +383,6 @@ public class ConnectionManager {
         }
       }
     }
-
-    /**
-     * Clean the unused connections for this pool.
-     *
-     * @param pool Connection pool to cleanup.
-     */
-    private void cleanup(ConnectionPool pool) {
-      if (pool.getNumConnections() > pool.getMinSize()) {
-        // Check if the pool hasn't been active in a while or not 50% are used
-        long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
-        int total = pool.getNumConnections();
-        int active = getNumActiveConnections();
-        if (timeSinceLastActive > connectionCleanupPeriodMs ||
-            active < MIN_ACTIVE_RATIO * total) {
-          // Remove and close 1 connection
-          List<ConnectionContext> conns = pool.removeConnections(1);
-          for (ConnectionContext conn : conns) {
-            conn.close();
-          }
-          LOG.debug("Removed connection {} used {} seconds ago. " +
-              "Pool has {}/{} connections", pool.getConnectionPoolId(),
-              TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
-              pool.getNumConnections(), pool.getMaxSize());
-        }
-      }
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c2b969e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
index a3a78de..6e1ee9a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -99,6 +100,11 @@ public class ConnectionPoolId implements 
Comparable<ConnectionPoolId> {
     return ret;
   }
 
+  @VisibleForTesting
+  UserGroupInformation getUgi() {
+    return this.ugi;
+  }
+
   /**
    * Get the token identifiers for this connection.
    * @return List with the token identifiers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c2b969e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
new file mode 100644
index 0000000..fe9f195
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test functionalities of {@link ConnectionManager}, which manages a pool
+ * of connections to NameNodes.
+ */
+public class TestConnectionManager {
+  private Configuration conf;
+  private ConnectionManager connManager;
+  private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"};
+  private static final UserGroupInformation TEST_USER1 =
+      UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
+  private static final UserGroupInformation TEST_USER2 =
+      UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
+  private static final String TEST_NN_ADDRESS = "nn1:8080";
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    connManager = new ConnectionManager(conf);
+    NetUtils.addStaticResolution("nn1", "localhost");
+    NetUtils.createSocketAddrForHost("nn1", 8080);
+    connManager.start();
+  }
+
+  @After
+  public void shutdown() {
+    if (connManager != null) {
+      connManager.close();
+    }
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+
+    ConnectionPool pool1 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+    addConnectionsToPool(pool1, 9, 4);
+    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1);
+
+    ConnectionPool pool2 = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10);
+    addConnectionsToPool(pool2, 10, 10);
+    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2);
+
+    checkPoolConnections(TEST_USER1, 9, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up first pool, one connection should be removed, and second pool
+    // should remain the same.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+
+    // Clean up the first pool again, it should have no effect since it reached
+    // the MIN_ACTIVE_RATIO.
+    connManager.cleanup(pool1);
+    checkPoolConnections(TEST_USER1, 8, 4);
+    checkPoolConnections(TEST_USER2, 10, 10);
+  }
+
+  private void addConnectionsToPool(ConnectionPool pool, int numTotalConn,
+      int numActiveConn) throws IOException {
+    for (int i = 0; i < numTotalConn; i++) {
+      ConnectionContext cc = pool.newConnection();
+      pool.addConnection(cc);
+      if (i < numActiveConn) {
+        cc.getClient();
+      }
+    }
+  }
+
+  private void checkPoolConnections(UserGroupInformation ugi,
+      int numOfConns, int numOfActiveConns) {
+    for (Map.Entry<ConnectionPoolId, ConnectionPool> e :
+        connManager.getPools().entrySet()) {
+      if (e.getKey().getUgi() == ugi) {
+        assertEquals(numOfConns, e.getValue().getNumConnections());
+        assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to