HDFS-8826. In Balancer, add an option to specify the source node list so that 
balancer only selects blocks to move from those nodes.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ecbfd44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ecbfd44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ecbfd44

Branch: refs/heads/HDFS-7240
Commit: 7ecbfd44aa57f5f54c214b7fdedda2500be76f51
Parents: 30e342a
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Tue Aug 18 19:25:50 2015 -0700
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Tue Aug 18 19:25:50 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/util/HostsFileReader.java |   7 +-
 .../org/apache/hadoop/util/StringUtils.java     |  16 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/balancer/Balancer.java   | 117 +++++++++-----
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  41 ++---
 .../hdfs/server/balancer/TestBalancer.java      | 156 ++++++++++++++++---
 6 files changed, 228 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
index ae77e6c..cac43c9 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
@@ -80,13 +80,14 @@ public class HostsFileReader {
         String[] nodes = line.split("[ \t\n\f\r]+");
         if (nodes != null) {
           for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].trim().startsWith("#")) {
+            nodes[i] = nodes[i].trim();
+            if (nodes[i].startsWith("#")) {
               // Everything from now on is a comment
               break;
             }
             if (!nodes[i].isEmpty()) {
-              LOG.info("Adding " + nodes[i] + " to the list of " + type +
-                  " hosts from " + filename);
+              LOG.info("Adding a node \"" + nodes[i] + "\" to the list of "
+                  + type + " hosts from " + filename);
               set.add(nodes[i]);
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index 73f9c4f..153270f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.util;
 
-import com.google.common.base.Preconditions;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URI;
@@ -28,7 +27,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -45,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 
 /**
@@ -379,19 +378,6 @@ public class StringUtils {
     return str.trim().split("\\s*,\\s*");
   }
 
-  /**
-   * Trims all the strings in a Collection<String> and returns a Set<String>.
-   * @param strings
-   * @return
-   */
-  public static Set<String> getTrimmedStrings(Collection<String> strings) {
-    Set<String> trimmedStrings = new HashSet<String>();
-    for (String string: strings) {
-      trimmedStrings.add(string.trim());
-    }
-    return trimmedStrings;
-  }
-
   final public static String[] emptyStringArray = {};
   final public static char COMMA = ',';
   final public static String COMMA_STR = ",";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9092fcb..038a646 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -808,6 +808,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas)
 
+    HDFS-8826. In Balancer, add an option to specify the source node list
+    so that balancer only selects blocks to move from those nodes.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e92ed81..fe6e4c3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,6 +55,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -188,6 +190,7 @@ public class Balancer {
   private final Dispatcher dispatcher;
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final Set<String> sourceNodes;
   private final boolean runDuringUpgrade;
   private final double threshold;
   private final long maxSizeToMove;
@@ -260,11 +263,12 @@ public class Balancer {
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
 
     this.nnc = theblockpool;
-    this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
-        p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
+    this.dispatcher = new Dispatcher(theblockpool, p.includedNodes,
+        p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads,
         maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.sourceNodes = p.sourceNodes;
     this.runDuringUpgrade = p.runDuringUpgrade;
 
     this.maxSizeToMove = getLong(conf,
@@ -318,14 +322,23 @@ public class Balancer {
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     for(DatanodeStorageReport r : reports) {
       final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
+      final boolean isSource = Util.isIncluded(sourceNodes, 
dn.getDatanodeInfo());
       for(StorageType t : StorageType.getMovableTypes()) {
         final Double utilization = policy.getUtilization(r, t);
         if (utilization == null) { // datanode does not have such storage type 
           continue;
         }
         
+        final double average = policy.getAvgUtilization(t);
+        if (utilization >= average && !isSource) {
+          LOG.info(dn + "[" + t + "] has utilization=" + utilization
+              + " >= average=" + average
+              + " but it is not specified as a source; skipping it.");
+          continue;
+        }
+
+        final double utilizationDiff = utilization - average;
         final long capacity = getCapacity(r, t);
-        final double utilizationDiff = utilization - 
policy.getAvgUtilization(t);
         final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
         final long maxSize2Move = computeMaxSize2Move(capacity,
             getRemaining(r, t), utilizationDiff, maxSizeToMove);
@@ -623,6 +636,9 @@ public class Balancer {
             DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
     LOG.info("namenodes  = " + namenodes);
     LOG.info("parameters = " + p);
+    LOG.info("included nodes = " + p.includedNodes);
+    LOG.info("excluded nodes = " + p.excludedNodes);
+    LOG.info("source nodes = " + p.sourceNodes);
     
     System.out.println("Time Stamp               Iteration#  Bytes Already 
Moved  Bytes Left To Move  Bytes Being Moved");
     
@@ -686,29 +702,35 @@ public class Balancer {
     static final Parameters DEFAULT = new Parameters(
         BalancingPolicy.Node.INSTANCE, 10.0,
         NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
-        Collections.<String> emptySet(), Collections.<String> emptySet(),
+        Collections.<String>emptySet(), Collections.<String>emptySet(),
+        Collections.<String>emptySet(),
         false);
 
     final BalancingPolicy policy;
     final double threshold;
     final int maxIdleIteration;
-    // exclude the nodes in this set from balancing operations
-    Set<String> nodesToBeExcluded;
-    //include only these nodes in balancing operations
-    Set<String> nodesToBeIncluded;
+    /** Exclude the nodes in this set. */
+    final Set<String> excludedNodes;
+    /** If empty, include any node; otherwise, include only these nodes. */
+    final Set<String> includedNodes;
+    /** If empty, any node can be a source;
+     *  otherwise, use only these nodes as source nodes.
+     */
+    final Set<String> sourceNodes;
     /**
      * Whether to run the balancer during upgrade.
      */
     final boolean runDuringUpgrade;
 
     Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
-        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
-        boolean runDuringUpgrade) {
+        Set<String> excludedNodes, Set<String> includedNodes,
+        Set<String> sourceNodes, boolean runDuringUpgrade) {
       this.policy = policy;
       this.threshold = threshold;
       this.maxIdleIteration = maxIdleIteration;
-      this.nodesToBeExcluded = nodesToBeExcluded;
-      this.nodesToBeIncluded = nodesToBeIncluded;
+      this.excludedNodes = excludedNodes;
+      this.includedNodes = includedNodes;
+      this.sourceNodes = sourceNodes;
       this.runDuringUpgrade = runDuringUpgrade;
     }
 
@@ -716,13 +738,14 @@ public class Balancer {
     public String toString() {
       return String.format("%s.%s [%s,"
               + " threshold = %s,"
-              + " max idle iteration = %s, "
-              + "number of nodes to be excluded = %s,"
-              + " number of nodes to be included = %s,"
+              + " max idle iteration = %s,"
+              + " #excluded nodes = %s,"
+              + " #included nodes = %s,"
+              + " #source nodes = %s,"
               + " run during upgrade = %s]",
           Balancer.class.getSimpleName(), getClass().getSimpleName(),
           policy, threshold, maxIdleIteration,
-          nodesToBeExcluded.size(), nodesToBeIncluded.size(),
+          excludedNodes.size(), includedNodes.size(), sourceNodes.size(),
           runDuringUpgrade);
     }
   }
@@ -763,8 +786,9 @@ public class Balancer {
       BalancingPolicy policy = Parameters.DEFAULT.policy;
       double threshold = Parameters.DEFAULT.threshold;
       int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
-      Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
-      Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
+      Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
+      Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
+      Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
       boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
 
       if (args != null) {
@@ -796,29 +820,14 @@ public class Balancer {
                 throw e;
               }
             } else if ("-exclude".equalsIgnoreCase(args[i])) {
-              checkArgument(++i < args.length,
-                  "List of nodes to exclude | -f <filename> is missing: args = 
"
-                  + Arrays.toString(args));
-              if ("-f".equalsIgnoreCase(args[i])) {
-                checkArgument(++i < args.length,
-                    "File containing nodes to exclude is not specified: args = 
"
-                    + Arrays.toString(args));
-                nodesTobeExcluded = Util.getHostListFromFile(args[i], 
"exclude");
-              } else {
-                nodesTobeExcluded = Util.parseHostList(args[i]);
-              }
+              excludedNodes = new HashSet<>();
+              i = processHostList(args, i, "exclude", excludedNodes);
             } else if ("-include".equalsIgnoreCase(args[i])) {
-              checkArgument(++i < args.length,
-                "List of nodes to include | -f <filename> is missing: args = "
-                + Arrays.toString(args));
-              if ("-f".equalsIgnoreCase(args[i])) {
-                checkArgument(++i < args.length,
-                    "File containing nodes to include is not specified: args = 
"
-                    + Arrays.toString(args));
-                nodesTobeIncluded = Util.getHostListFromFile(args[i], 
"include");
-               } else {
-                nodesTobeIncluded = Util.parseHostList(args[i]);
-              }
+              includedNodes = new HashSet<>();
+              i = processHostList(args, i, "include", includedNodes);
+            } else if ("-source".equalsIgnoreCase(args[i])) {
+              sourceNodes = new HashSet<>();
+              i = processHostList(args, i, "source", sourceNodes);
             } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
               checkArgument(++i < args.length,
                   "idleiterations value is missing: args = " + Arrays
@@ -836,7 +845,7 @@ public class Balancer {
                   + Arrays.toString(args));
             }
           }
-          checkArgument(nodesTobeExcluded.isEmpty() || 
nodesTobeIncluded.isEmpty(),
+          checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(),
               "-exclude and -include options cannot be specified together.");
         } catch(RuntimeException e) {
           printUsage(System.err);
@@ -845,7 +854,31 @@ public class Balancer {
       }
       
       return new Parameters(policy, threshold, maxIdleIteration,
-          nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
+          excludedNodes, includedNodes, sourceNodes, runDuringUpgrade);
+    }
+
+    private static int processHostList(String[] args, int i, String type,
+        Set<String> nodes) {
+      Preconditions.checkArgument(++i < args.length,
+          "List of %s nodes | -f <filename> is missing: args=%s",
+          type, Arrays.toString(args));
+      if ("-f".equalsIgnoreCase(args[i])) {
+        Preconditions.checkArgument(++i < args.length,
+            "File containing %s nodes is not specified: args=%s",
+            type, Arrays.toString(args));
+
+        final String filename = args[i];
+        try {
+          HostsFileReader.readFileToSet(type, filename, nodes);
+        } catch (IOException e) {
+          throw new IllegalArgumentException(
+              "Failed to read " + type + " node list from file: " + filename);
+        }
+      } else {
+        final String[] addresses = StringUtils.getTrimmedStrings(args[i]);
+        nodes.addAll(Arrays.asList(addresses));
+      }
+      return i;
     }
 
     private static void printUsage(PrintStream out) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index f9847ca..1a70bd3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -28,7 +28,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -70,7 +69,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
@@ -796,7 +794,11 @@ public class Dispatcher {
         if (shouldFetchMoreBlocks()) {
           // fetch new blocks
           try {
-            blocksToReceive -= getBlockList();
+            final long received = getBlockList();
+            if (received == 0) {
+              return;
+            }
+            blocksToReceive -= received;
             continue;
           } catch (IOException e) {
             LOG.warn("Exception while getting block list", e);
@@ -925,8 +927,11 @@ public class Dispatcher {
 
     if (decommissioned || decommissioning || excluded || notIncluded) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
-            + decommissioning + ", " + excluded + ", " + notIncluded);
+        LOG.trace("Excluding datanode " + dn
+            + ": decommissioned=" + decommissioned
+            + ", decommissioning=" + decommissioning
+            + ", excluded=" + excluded
+            + ", notIncluded=" + notIncluded);
       }
       return true;
     }
@@ -1213,31 +1218,5 @@ public class Dispatcher {
       }
       return (nodes.contains(host) || nodes.contains(host + ":" + port));
     }
-
-    /**
-     * Parse a comma separated string to obtain set of host names
-     * 
-     * @return set of host names
-     */
-    static Set<String> parseHostList(String string) {
-      String[] addrs = StringUtils.getTrimmedStrings(string);
-      return new HashSet<String>(Arrays.asList(addrs));
-    }
-
-    /**
-     * Read set of host names from a file
-     * 
-     * @return set of host names
-     */
-    static Set<String> getHostListFromFile(String fileName, String type) {
-      Set<String> nodes = new HashSet<String>();
-      try {
-        HostsFileReader.readFileToSet(type, fileName, nodes);
-        return StringUtils.getTrimmedStrings(nodes);
-      } catch (IOException e) {
-        throw new IllegalArgumentException(
-            "Failed to read host list from file: " + fileName);
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ecbfd44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index f8f4329..c1ed758 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -33,6 +33,7 @@ import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -397,11 +398,11 @@ public class TestBalancer {
     long timeout = TIMEOUT;
     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
         : Time.monotonicNow() + timeout;
-    if (!p.nodesToBeIncluded.isEmpty()) {
-      totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
+    if (!p.includedNodes.isEmpty()) {
+      totalCapacity = p.includedNodes.size() * CAPACITY;
     }
-    if (!p.nodesToBeExcluded.isEmpty()) {
-        totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
+    if (!p.excludedNodes.isEmpty()) {
+        totalCapacity -= p.excludedNodes.size() * CAPACITY;
     }
     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
     boolean balanced;
@@ -414,12 +415,12 @@ public class TestBalancer {
       for (DatanodeInfo datanode : datanodeReport) {
         double nodeUtilization = ((double)datanode.getDfsUsed())
             / datanode.getCapacity();
-        if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) {
+        if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) {
           assertTrue(nodeUtilization == 0);
           actualExcludedNodeCount++;
           continue;
         }
-        if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) {
+        if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) {
           assertTrue(nodeUtilization == 0);
           actualExcludedNodeCount++;
           continue;
@@ -642,6 +643,7 @@ public class TestBalancer {
             Balancer.Parameters.DEFAULT.threshold,
             Balancer.Parameters.DEFAULT.maxIdleIteration,
             nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
+            Balancer.Parameters.DEFAULT.sourceNodes,
             false);
       }
 
@@ -754,36 +756,36 @@ public class TestBalancer {
     args.add("datanode");
 
     File excludeHostsFile = null;
-    if (!p.nodesToBeExcluded.isEmpty()) {
+    if (!p.excludedNodes.isEmpty()) {
       args.add("-exclude");
       if (useFile) {
         excludeHostsFile = new File ("exclude-hosts-file");
         PrintWriter pw = new PrintWriter(excludeHostsFile);
-        for (String host: p.nodesToBeExcluded) {
+        for (String host: p.excludedNodes) {
           pw.write( host + "\n");
         }
         pw.close();
         args.add("-f");
         args.add("exclude-hosts-file");
       } else {
-        args.add(StringUtils.join(p.nodesToBeExcluded, ','));
+        args.add(StringUtils.join(p.excludedNodes, ','));
       }
     }
 
     File includeHostsFile = null;
-    if (!p.nodesToBeIncluded.isEmpty()) {
+    if (!p.includedNodes.isEmpty()) {
       args.add("-include");
       if (useFile) {
         includeHostsFile = new File ("include-hosts-file");
         PrintWriter pw = new PrintWriter(includeHostsFile);
-        for (String host: p.nodesToBeIncluded){
+        for (String host: p.includedNodes){
           pw.write( host + "\n");
         }
         pw.close();
         args.add("-f");
         args.add("include-hosts-file");
       } else {
-        args.add(StringUtils.join(p.nodesToBeIncluded, ','));
+        args.add(StringUtils.join(p.includedNodes, ','));
       }
     }
 
@@ -881,7 +883,8 @@ public class TestBalancer {
           Balancer.Parameters.DEFAULT.policy,
           Balancer.Parameters.DEFAULT.threshold,
           Balancer.Parameters.DEFAULT.maxIdleIteration,
-          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
+          datanodes, Balancer.Parameters.DEFAULT.includedNodes,
+          Balancer.Parameters.DEFAULT.sourceNodes,
           false);
       final int r = Balancer.run(namenodes, p, conf);
       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
@@ -1092,7 +1095,7 @@ public class TestBalancer {
     excludeHosts.add( "datanodeZ");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"},
-        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
+        excludeHosts, Parameters.DEFAULT.includedNodes), false, false);
   }
 
   /**
@@ -1122,7 +1125,7 @@ public class TestBalancer {
     excludeHosts.add( "datanodeZ");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
       new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"}, excludeHosts,
-      Parameters.DEFAULT.nodesToBeIncluded), true, false);
+      Parameters.DEFAULT.includedNodes), true, false);
   }
 
   /**
@@ -1152,7 +1155,7 @@ public class TestBalancer {
     excludeHosts.add( "datanodeZ");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"},
-        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
+        excludeHosts, Parameters.DEFAULT.includedNodes), true, true);
   }
 
   /**
@@ -1181,7 +1184,7 @@ public class TestBalancer {
     includeHosts.add( "datanodeY");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"},
-        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
+        Parameters.DEFAULT.excludedNodes, includeHosts), false, false);
   }
 
   /**
@@ -1210,7 +1213,7 @@ public class TestBalancer {
     includeHosts.add( "datanodeY");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"},
-        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
+        Parameters.DEFAULT.excludedNodes, includeHosts), true, false);
   }
 
   /**
@@ -1239,7 +1242,7 @@ public class TestBalancer {
     includeHosts.add( "datanodeY");
     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 
CAPACITY, RACK2,
         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", 
"datanodeZ"},
-        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
+        Parameters.DEFAULT.excludedNodes, includeHosts), true, true);
   }
 
   /**
@@ -1379,8 +1382,9 @@ public class TestBalancer {
           new Balancer.Parameters(Parameters.DEFAULT.policy,
               Parameters.DEFAULT.threshold,
               Parameters.DEFAULT.maxIdleIteration,
-              Parameters.DEFAULT.nodesToBeExcluded,
-              Parameters.DEFAULT.nodesToBeIncluded,
+              Parameters.DEFAULT.excludedNodes,
+              Parameters.DEFAULT.includedNodes,
+              Parameters.DEFAULT.sourceNodes,
               true);
       assertEquals(ExitStatus.SUCCESS.getExitCode(),
           Balancer.run(namenodes, runDuringUpgrade, conf));
@@ -1536,6 +1540,116 @@ public class TestBalancer {
     }
   }
 
+  /** Balancer should not move blocks with size < minBlockSize. */
+  @Test(timeout=60000)
+  public void testMinBlockSizeAndSourceNodes() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+ 
+    final short replication = 3;
+    final long[] lengths = {10, 10, 10, 10}; 
+    final long[] capacities = new long[replication];
+    final long totalUsed = capacities.length * sum(lengths);
+    Arrays.fill(capacities, 1000);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(capacities.length)
+        .simulatedCapacities(capacities)
+        .build();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+          ClientProtocol.class).getProxy();
+      
+      // fill up the cluster to be 80% full
+      for(int i = 0; i < lengths.length; i++) {
+        final long size = lengths[i];
+        final Path p = new Path("/file" + i + "_size" + size);
+        try(final OutputStream out = dfs.create(p)) {
+          for(int j = 0; j < size; j++) {
+            out.write(j);
+          }
+        }
+      }
+      
+      // start up an empty node with the same capacity
+      cluster.startDataNodes(conf, capacities.length, true, null, null, 
capacities);
+      LOG.info("capacities    = " + Arrays.toString(capacities));
+      LOG.info("totalUsedSpace= " + totalUsed);
+      LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + 
lengths.length);
+      waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, 
cluster);
+      
+      final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+      { // run Balancer with min-block-size=50
+        final Parameters p = new Parameters(
+            BalancingPolicy.Node.INSTANCE, 1,
+            NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+            Collections.<String> emptySet(), Collections.<String> emptySet(),
+            Collections.<String> emptySet(), false);
+
+        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
+        final int r = Balancer.run(namenodes, p, conf);
+        assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+      }
+      
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+
+      { // run Balancer with empty nodes as source nodes
+        final Set<String> sourceNodes = new HashSet<>();
+        final List<DataNode> datanodes = cluster.getDataNodes();
+        for(int i = capacities.length; i < datanodes.size(); i++) {
+          sourceNodes.add(datanodes.get(i).getDisplayName());
+        }
+        final Parameters p = new Parameters(
+          BalancingPolicy.Node.INSTANCE, 1,
+          NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+          Collections.<String> emptySet(), Collections.<String> emptySet(),
+          sourceNodes, false);
+
+        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
+        final int r = Balancer.run(namenodes, p, conf);
+        assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+      }
+
+      { // run Balancer with a filled node as a source node
+        final Set<String> sourceNodes = new HashSet<>();
+        final List<DataNode> datanodes = cluster.getDataNodes();
+        sourceNodes.add(datanodes.get(0).getDisplayName());
+        final Parameters p = new Parameters(
+          BalancingPolicy.Node.INSTANCE, 1,
+          NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+          Collections.<String> emptySet(), Collections.<String> emptySet(),
+          sourceNodes, false);
+
+        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
1);
+        final int r = Balancer.run(namenodes, p, conf);
+        assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+      }
+
+      { // run Balancer with all filled node as source nodes
+        final Set<String> sourceNodes = new HashSet<>();
+        final List<DataNode> datanodes = cluster.getDataNodes();
+        for(int i = 0; i < capacities.length; i++) {
+          sourceNodes.add(datanodes.get(i).getDisplayName());
+        }
+        final Parameters p = new Parameters(
+          BalancingPolicy.Node.INSTANCE, 1,
+          NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+          Collections.<String> emptySet(), Collections.<String> emptySet(),
+          sourceNodes, false);
+
+        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
1);
+        final int r = Balancer.run(namenodes, p, conf);
+        assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
   /**
    * @param args
    */

Reply via email to