HDFS-10824. MiniDFSCluster#storageCapacities has no effects on real capacity. 
Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3b235e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3b235e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3b235e5

Branch: refs/heads/HDFS-10467
Commit: c3b235e56597d55387b4003e376faee10b473d55
Parents: e19b37e
Author: Arpit Agarwal <a...@apache.org>
Authored: Wed Sep 28 11:47:37 2016 -0700
Committer: Arpit Agarwal <a...@apache.org>
Committed: Wed Sep 28 11:47:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  | 103 ++++++++++++----
 .../apache/hadoop/hdfs/TestMiniDFSCluster.java  | 119 +++++++++++++++++++
 2 files changed, 199 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3b235e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 3bb3a10..cf02a8d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -56,6 +56,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -547,6 +548,8 @@ public class MiniDFSCluster implements AutoCloseable {
   protected final int storagesPerDatanode;
   private Set<FileSystem> fileSystems = Sets.newHashSet();
 
+  private List<long[]> storageCap = Lists.newLinkedList();
+
   /**
    * A unique instance identifier for the cluster. This
    * is used to disambiguate HA filesystems in the case where
@@ -1648,31 +1651,64 @@ public class MiniDFSCluster implements AutoCloseable {
     }
     this.numDataNodes += numDataNodes;
     waitActive();
-    
+
+    setDataNodeStorageCapacities(
+        curDatanodesNum,
+        numDataNodes,
+        dns,
+        storageCapacities);
+
+    /* memorize storage capacities */
+    if (storageCapacities != null) {
+      storageCap.addAll(Arrays.asList(storageCapacities));
+    }
+  }
+
+  private synchronized void setDataNodeStorageCapacities(
+      final int curDatanodesNum,
+      final int numDNs,
+      final DataNode[] dns,
+      long[][] storageCapacities) throws IOException {
     if (storageCapacities != null) {
-      for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
+      for (int i = curDatanodesNum; i < curDatanodesNum + numDNs; ++i) {
         final int index = i - curDatanodesNum;
-        try (FsDatasetSpi.FsVolumeReferences volumes =
-            dns[index].getFSDataset().getFsVolumeReferences()) {
-          assert storageCapacities[index].length == storagesPerDatanode;
-          assert volumes.size() == storagesPerDatanode;
-
-          int j = 0;
-          for (FsVolumeSpi fvs : volumes) {
-            FsVolumeImpl volume = (FsVolumeImpl) fvs;
-            LOG.info("setCapacityForTesting " + storageCapacities[index][j]
-                + " for [" + volume.getStorageType() + "]" + volume
-                .getStorageID());
-            volume.setCapacityForTesting(storageCapacities[index][j]);
-            j++;
-          }
-        }
+        setDataNodeStorageCapacities(index, dns[index], storageCapacities);
       }
     }
   }
-  
-  
-  
+
+  private synchronized void setDataNodeStorageCapacities(
+      final int curDnIdx,
+      final DataNode curDn,
+      long[][] storageCapacities) throws IOException {
+
+    if (storageCapacities == null || storageCapacities.length == 0) {
+      return;
+    }
+
+    try {
+      waitDataNodeFullyStarted(curDn);
+    } catch (TimeoutException | InterruptedException e) {
+      throw new IOException(e);
+    }
+
+    try (FsDatasetSpi.FsVolumeReferences volumes = curDn.getFSDataset()
+        .getFsVolumeReferences()) {
+      assert storageCapacities[curDnIdx].length == storagesPerDatanode;
+      assert volumes.size() == storagesPerDatanode;
+
+      int j = 0;
+      for (FsVolumeSpi fvs : volumes) {
+        FsVolumeImpl volume = (FsVolumeImpl) fvs;
+        LOG.info("setCapacityForTesting " + storageCapacities[curDnIdx][j]
+            + " for [" + volume.getStorageType() + "]" + 
volume.getStorageID());
+        volume.setCapacityForTesting(storageCapacities[curDnIdx][j]);
+        j++;
+      }
+    }
+    DataNodeTestUtils.triggerHeartbeat(curDn);
+  }
+
   /**
    * Modify the config and start up the DataNodes.  The info port for
    * DataNodes is guaranteed to use a free port.
@@ -2236,6 +2272,16 @@ public class MiniDFSCluster implements AutoCloseable {
     return restartDataNode(dnprop, false);
   }
 
+  private void waitDataNodeFullyStarted(final DataNode dn)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return dn.isDatanodeFullyStarted();
+      }
+    }, 100, 60000);
+  }
+
   /**
    * Restart a datanode, on the same port if requested
    * @param dnprop the datanode to restart
@@ -2256,10 +2302,21 @@ public class MiniDFSCluster implements AutoCloseable {
       conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
           addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); 
     }
-    DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
-    dataNodes.add(new DataNodeProperties(
-        newDn, newconf, args, secureResources, newDn.getIpcPort()));
+    final DataNode newDn = DataNode.createDataNode(args, conf, 
secureResources);
+
+    final DataNodeProperties dnp = new DataNodeProperties(
+        newDn,
+        newconf,
+        args,
+        secureResources,
+        newDn.getIpcPort());
+    dataNodes.add(dnp);
     numDataNodes++;
+
+    setDataNodeStorageCapacities(
+        dataNodes.lastIndexOf(dnp),
+        newDn,
+        storageCap.toArray(new long[][]{}));
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3b235e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
index 4d027dc..3d4cc72 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
@@ -25,16 +25,25 @@ import static org.junit.Assume.assumeTrue;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Tests MiniDFS cluster setup/teardown and isolation.
  * Every instance is brought up with a new data dir, to ensure that
@@ -78,6 +87,116 @@ public class TestMiniDFSCluster {
     }
   }
 
+  /**
+   * Tests storage capacity setting still effective after cluster restart.
+   */
+  @Test(timeout=100000)
+  public void testClusterSetStorageCapacity() throws Throwable {
+
+    final Configuration conf = new HdfsConfiguration();
+    final int numDatanodes = 1;
+    final int defaultBlockSize = 1024;
+    final int blocks = 100;
+    final int blocksSize = 1024;
+    final int fileLen = blocks * blocksSize;
+    final long capcacity = defaultBlockSize * 2 * fileLen;
+    final long[] capacities = new long[] {capcacity, 2 * capcacity};
+
+    final MiniDFSCluster cluster = newCluster(
+            conf,
+            numDatanodes,
+            capacities,
+            defaultBlockSize,
+            fileLen);
+    verifyStorageCapacity(cluster, capacities);
+
+    /* restart all data nodes */
+    cluster.restartDataNodes();
+    cluster.waitActive();
+    verifyStorageCapacity(cluster, capacities);
+
+    /* restart all name nodes */
+    cluster.restartNameNodes();
+    cluster.waitActive();
+    verifyStorageCapacity(cluster, capacities);
+
+    /* restart all name nodes firstly and data nodes then */
+    cluster.restartNameNodes();
+    cluster.restartDataNodes();
+    cluster.waitActive();
+    verifyStorageCapacity(cluster, capacities);
+
+    /* restart all data nodes firstly and name nodes then */
+    cluster.restartDataNodes();
+    cluster.restartNameNodes();
+    cluster.waitActive();
+    verifyStorageCapacity(cluster, capacities);
+  }
+
+  private void verifyStorageCapacity(
+      final MiniDFSCluster cluster,
+      final long[] capacities) throws IOException {
+
+    FsVolumeImpl source = null;
+    FsVolumeImpl dest = null;
+
+    /* verify capacity */
+    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+      final DataNode dnNode = cluster.getDataNodes().get(i);
+      try (FsDatasetSpi.FsVolumeReferences refs = dnNode.getFSDataset()
+          .getFsVolumeReferences()) {
+        source = (FsVolumeImpl) refs.get(0);
+        dest = (FsVolumeImpl) refs.get(1);
+        assertEquals(capacities[0], source.getCapacity());
+        assertEquals(capacities[1], dest.getCapacity());
+      }
+    }
+  }
+
+  private MiniDFSCluster newCluster(
+      final Configuration conf,
+      final int numDatanodes,
+      final long[] storageCapacities,
+      final int defaultBlockSize,
+      final int fileLen)
+      throws IOException, InterruptedException, TimeoutException {
+
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+    final String fileName = "/" + UUID.randomUUID().toString();
+    final Path filePath = new Path(fileName);
+
+    Preconditions.checkNotNull(storageCapacities);
+    Preconditions.checkArgument(
+        storageCapacities.length == 2,
+        "need to specify capacities for two storages.");
+
+    /* Write a file and restart the cluster */
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .storageCapacities(storageCapacities)
+        .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
+        .storagesPerDatanode(2)
+        .build();
+    cluster.waitActive();
+
+    final short replicationFactor = (short) 1;
+    final Random r = new Random();
+    FileSystem fs = cluster.getFileSystem(0);
+    DFSTestUtil.createFile(
+        fs,
+        filePath,
+        fileLen,
+        replicationFactor,
+        r.nextLong());
+    DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+
+    return cluster;
+  }
+
   @Test(timeout=100000)
   public void testIsClusterUpAfterShutdown() throws Throwable {
     Configuration conf = new HdfsConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to