HDFS-10968. BlockManager#isInNewRack should consider decommissioning nodes. 
Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d106213
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d106213
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d106213

Branch: refs/heads/HADOOP-13037
Commit: 4d106213c0f4835b723c9a50bd8080a9017122d7
Parents: 6a38d11
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Oct 7 22:44:54 2016 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Oct 7 22:44:54 2016 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |   6 +-
 ...constructStripedBlocksWithRackAwareness.java | 158 +++++++++++++++----
 2 files changed, 130 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d106213/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8b74609..7949439 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1781,8 +1781,12 @@ public class BlockManager implements BlockStatsMXBean {
 
   private boolean isInNewRack(DatanodeDescriptor[] srcs,
       DatanodeDescriptor target) {
+    LOG.debug("check if target {} increases racks, srcs={}", target,
+        Arrays.asList(srcs));
     for (DatanodeDescriptor src : srcs) {
-      if (src.getNetworkLocation().equals(target.getNetworkLocation())) {
+      if (!src.isDecommissionInProgress() &&
+          src.getNetworkLocation().equals(target.getNetworkLocation())) {
+        LOG.debug("the target {} is in the same rack with src {}", target, 
src);
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d106213/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
index 152e153..3bc13a8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -35,12 +35,14 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -58,57 +60,44 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
   }
 
-  private static final String[] hosts = getHosts();
-  private static final String[] racks = getRacks();
+  private static final String[] hosts =
+      getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1);
+  private static final String[] racks =
+      getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1, NUM_DATA_BLOCKS);
 
-  private static String[] getHosts() {
-    String[] hosts = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
+  private static String[] getHosts(int numHosts) {
+    String[] hosts = new String[numHosts];
     for (int i = 0; i < hosts.length; i++) {
       hosts[i] = "host" + (i + 1);
     }
     return hosts;
   }
 
-  private static String[] getRacks() {
-    String[] racks = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
-    int numHostEachRack = (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 1) /
-        (NUM_DATA_BLOCKS - 1) + 1;
+  private static String[] getRacks(int numHosts, int numRacks) {
+    String[] racks = new String[numHosts];
+    int numHostEachRack = numHosts / numRacks;
+    int residue = numHosts % numRacks;
     int j = 0;
-    // we have NUM_DATA_BLOCKS racks
-    for (int i = 1; i <= NUM_DATA_BLOCKS; i++) {
-      if (j == racks.length - 1) {
-        assert i == NUM_DATA_BLOCKS;
+    for (int i = 1; i <= numRacks; i++) {
+      int limit = i <= residue ? numHostEachRack + 1 : numHostEachRack;
+      for (int k = 0; k < limit; k++) {
         racks[j++] = "/r" + i;
-      } else {
-        for (int k = 0; k < numHostEachRack && j < racks.length - 1; k++) {
-          racks[j++] = "/r" + i;
-        }
       }
     }
+    assert j == numHosts;
     return racks;
   }
 
   private MiniDFSCluster cluster;
+  private static final HdfsConfiguration conf = new HdfsConfiguration();
   private DistributedFileSystem fs;
-  private FSNamesystem fsn;
-  private BlockManager bm;
 
-  @Before
-  public void setup() throws Exception {
-    final HdfsConfiguration conf = new HdfsConfiguration();
+  @BeforeClass
+  public static void setup() throws Exception {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
         false);
-
-    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
-        .numDataNodes(hosts.length).build();
-    cluster.waitActive();
-
-    fsn = cluster.getNamesystem();
-    bm = fsn.getBlockManager();
-
-    fs = cluster.getFileSystem();
-    fs.setErasureCodingPolicy(new Path("/"), null);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
   }
 
   @After
@@ -132,6 +121,15 @@ public class TestReconstructStripedBlocksWithRackAwareness 
{
     return dnProp;
   }
 
+  private DataNode getDataNode(String host) {
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getDatanodeId().getHostName().equals(host)) {
+        return dn;
+      }
+    }
+    return null;
+  }
+
   /**
    * When there are all the internal blocks available but they are not placed 
on
    * enough racks, NameNode should avoid normal decoding reconstruction but 
copy
@@ -143,9 +141,19 @@ public class TestReconstructStripedBlocksWithRackAwareness 
{
    */
   @Test
   public void testReconstructForNotEnoughRacks() throws Exception {
+    LOG.info("cluster hosts: {}, racks: {}", Arrays.asList(hosts),
+        Arrays.asList(racks));
+
+    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
+        .numDataNodes(hosts.length).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.setErasureCodingPolicy(new Path("/"), null);
+    FSNamesystem fsn = cluster.getNamesystem();
+    BlockManager bm = fsn.getBlockManager();
+
     MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
         hosts[hosts.length - 1]);
-
     final Path file = new Path("/foo");
     // the file's block is in 9 dn but 5 racks
     DFSTestUtil.createFile(fs, file,
@@ -206,6 +214,12 @@ public class TestReconstructStripedBlocksWithRackAwareness 
{
 
   @Test
   public void testChooseExcessReplicasToDelete() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
+        .numDataNodes(hosts.length).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.setErasureCodingPolicy(new Path("/"), null);
+
     MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
         hosts[hosts.length - 1]);
 
@@ -242,4 +256,82 @@ public class TestReconstructStripedBlocksWithRackAwareness 
{
       Assert.assertFalse(dn.getHostName().equals("host1"));
     }
   }
+
+  /**
+   * In case we have 10 internal blocks on 5 racks, where 9 of blocks are live
+   * and 1 decommissioning, make sure the reconstruction happens correctly.
+   */
+  @Test
+  public void testReconstructionWithDecommission() throws Exception {
+    final String[] racks = getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2,
+        NUM_DATA_BLOCKS);
+    final String[] hosts = getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2);
+    // we now have 11 hosts on 6 racks with distribution: 2-2-2-2-2-1
+    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
+        .numDataNodes(hosts.length).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.setErasureCodingPolicy(new Path("/"), null);
+
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    final DatanodeManager dm = bm.getDatanodeManager();
+
+    // stop h9 and h10 and create a file with 6+3 internal blocks
+    MiniDFSCluster.DataNodeProperties h9 = stopDataNode(hosts[hosts.length - 
3]);
+    MiniDFSCluster.DataNodeProperties h10 = stopDataNode(hosts[hosts.length - 
2]);
+    final Path file = new Path("/foo");
+    DFSTestUtil.createFile(fs, file,
+        BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
+    final BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory()
+        .getINode(file.toString()).asFile().getLastBlock();
+
+    // bring h9 back
+    cluster.restartDataNode(h9);
+    cluster.waitActive();
+
+    // stop h11 so that the reconstruction happens
+    MiniDFSCluster.DataNodeProperties h11 = stopDataNode(hosts[hosts.length - 
1]);
+    boolean recovered = bm.countNodes(blockInfo).liveReplicas() >=
+        NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    for (int i = 0; i < 10 & !recovered; i++) {
+      Thread.sleep(1000);
+      recovered = bm.countNodes(blockInfo).liveReplicas() >=
+          NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    }
+    Assert.assertTrue(recovered);
+
+    // mark h9 as decommissioning
+    DataNode datanode9 = getDataNode(hosts[hosts.length - 3]);
+    Assert.assertNotNull(datanode9);
+    final DatanodeDescriptor dn9 = dm.getDatanode(datanode9.getDatanodeId());
+    dn9.startDecommission();
+
+    // restart h10 and h11
+    cluster.restartDataNode(h10);
+    cluster.restartDataNode(h11);
+    cluster.waitActive();
+    DataNodeTestUtils.triggerBlockReport(getDataNode(hosts[hosts.length - 1]));
+
+    // start decommissioning h9
+    boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
+    Assert.assertFalse(satisfied);
+    final DecommissionManager decomManager =
+        (DecommissionManager) Whitebox.getInternalState(dm, "decomManager");
+    cluster.getNamesystem().writeLock();
+    try {
+      dn9.stopDecommission();
+      decomManager.startDecommission(dn9);
+    } finally {
+      cluster.getNamesystem().writeUnlock();
+    }
+
+    // make sure the decommission finishes and the block in on 6 racks
+    boolean decommissioned = dn9.isDecommissioned();
+    for (int i = 0; i < 10 && !decommissioned; i++) {
+      Thread.sleep(1000);
+      decommissioned = dn9.isDecommissioned();
+    }
+    Assert.assertTrue(decommissioned);
+    Assert.assertTrue(bm.isPlacementPolicySatisfied(blockInfo));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to