HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote 
NameNode (Charles Lamb via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffce9a34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffce9a34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffce9a34

Branch: refs/heads/HDFS-7240
Commit: ffce9a3413277a69444fcb890460c885de56db69
Parents: e4c3b52
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Tue May 5 11:27:36 2015 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Tue May 5 11:34:58 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  40 ++++++
 .../server/namenode/NNThroughputBenchmark.java  | 136 +++++++++++++------
 3 files changed, 137 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c89e6fe..01de9b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-7758. Retire FsDatasetSpi#getVolumes() and use
     FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)
 
+    HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
+    NameNode (Charles Lamb via Colin P. McCabe)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a8df991..cfee997 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -48,6 +48,7 @@ import java.lang.reflect.Modifier;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.ByteBuffer;
@@ -64,6 +65,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -129,12 +131,14 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -147,6 +151,7 @@ import org.apache.log4j.Level;
 import org.junit.Assume;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -1756,6 +1761,41 @@ public class DFSTestUtil {
   }
 
   /**
+   * Get the NamenodeProtocol RPC proxy for the NN associated with this
+   * DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the Namenode RPC proxy associated with this DFSClient object
+   */
+  @VisibleForTesting
+  public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
+      URI nameNodeUri, UserGroupInformation ugi)
+      throws IOException {
+    return NameNodeProxies.createNonHAProxy(conf,
+        NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
+        getProxy();
+  }
+
+  /**
+   * Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
+   * this DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the RefreshUserMappingsProtocol RPC proxy associated with this
+   * DFSClient object
+   */
+  @VisibleForTesting
+  public static RefreshUserMappingsProtocol 
getRefreshUserMappingsProtocolProxy(
+      Configuration conf, URI nameNodeUri) throws IOException {
+    final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
+    return NameNodeProxies.createProxy(conf,
+        nameNodeUri, RefreshUserMappingsProtocol.class,
+        nnFallbackToSimpleAuth).getProxy();
+  }
+
+  /**
    * Set the datanode dead
    */
   public static void setDatanodeDead(DatanodeInfo dn) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index db0185d..2964f9a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -30,19 +31,24 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -53,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@@ -63,6 +70,8 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -96,6 +105,9 @@ import org.apache.log4j.LogManager;
  * By default the refresh is never called.</li>
  * <li>-keepResults do not clean up the name-space after execution.</li>
  * <li>-useExisting do not recreate the name-space, use existing data.</li>
+ * <li>-namenode will run the test against a namenode in another
+ * process or on another host. If you use this option, the namenode
+ * must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
  * </ol>
  * 
  * The benchmark first generates inputs for each thread so that the
@@ -111,11 +123,20 @@ public class NNThroughputBenchmark implements Tool {
   private static final Log LOG = 
LogFactory.getLog(NNThroughputBenchmark.class);
   private static final int BLOCK_SIZE = 16;
   private static final String GENERAL_OPTIONS_USAGE = 
-    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";
+    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
+    " [-namenode <namenode URI>]\n" +
+    "     If using -namenode, set the namenode's" +
+    "         dfs.namenode.fs-limits.min-block-size to 16.";
 
   static Configuration config;
   static NameNode nameNode;
-  static NamenodeProtocols nameNodeProto;
+  static NamenodeProtocol nameNodeProto;
+  static ClientProtocol clientProto;
+  static DatanodeProtocol dataNodeProto;
+  static RefreshUserMappingsProtocol refreshUserMappingsProto;
+  static String bpid = null;
+
+  private String namenodeUri = null; // NN URI to use, if specified
 
   NNThroughputBenchmark(Configuration conf) throws IOException {
     config = conf;
@@ -264,7 +285,7 @@ public class NNThroughputBenchmark implements Tool {
         for(StatsDaemon d : daemons)
           d.start();
       } finally {
-        while(isInPorgress()) {
+        while(isInProgress()) {
           // try {Thread.sleep(500);} catch (InterruptedException e) {}
         }
         elapsedTime = Time.now() - start;
@@ -275,7 +296,7 @@ public class NNThroughputBenchmark implements Tool {
       }
     }
 
-    private boolean isInPorgress() {
+    private boolean isInProgress() {
       for(StatsDaemon d : daemons)
         if(d.isInProgress())
           return true;
@@ -283,10 +304,10 @@ public class NNThroughputBenchmark implements Tool {
     }
 
     void cleanUp() throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       if(!keepResults)
-        nameNodeProto.delete(getBaseDir(), true);
+        clientProto.delete(getBaseDir(), true);
     }
 
     int getNumOpsExecuted() {
@@ -360,6 +381,12 @@ public class NNThroughputBenchmark implements Tool {
         args.remove(ugrcIndex);
       }
 
+      try {
+        namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
+      } catch (IllegalArgumentException iae) {
+        printUsage();
+      }
+
       String type = args.get(1);
       if(OP_ALL_NAME.equals(type)) {
         type = getOpName();
@@ -418,7 +445,7 @@ public class NNThroughputBenchmark implements Tool {
     void benchmarkOne() throws IOException {
       for(int idx = 0; idx < opsPerThread; idx++) {
         if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
-          nameNodeProto.refreshUserToGroupsMappings();
+          refreshUserMappingsProto.refreshUserToGroupsMappings();
         long stat = statsOp.executeOp(daemonId, idx, arg1);
         localNumOpsExecuted++;
         localCumulativeTime += stat;
@@ -484,10 +511,10 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       long start = Time.now();
-      nameNodeProto.delete(BASE_DIR_NAME, true);
+      clientProto.delete(BASE_DIR_NAME, true);
       long end = Time.now();
       return end-start;
     }
@@ -553,7 +580,7 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       // int generatedFileIdx = 0;
       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
@@ -588,13 +615,13 @@ public class NNThroughputBenchmark implements Tool {
     throws IOException {
       long start = Time.now();
       // dummyActionNoSynch(fileIdx);
-      nameNodeProto.create(fileNames[daemonId][inputIdx], 
FsPermission.getDefault(),
+      clientProto.create(fileNames[daemonId][inputIdx], 
FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
               .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, 
-          replication, BLOCK_SIZE, null);
+          replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
       long end = Time.now();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
+        written = clientProto.complete(fileNames[daemonId][inputIdx],
                                     clientName, null, 
HdfsConstants.GRANDFATHER_INODE_ID));
       return end-start;
     }
@@ -657,7 +684,7 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length";
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
       dirPaths = new String[numThreads][];
@@ -685,7 +712,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String clientName)
         throws IOException {
       long start = Time.now();
-      nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx],
+      clientProto.mkdirs(dirPaths[daemonId][inputIdx],
           FsPermission.getDefault(), true);
       long end = Time.now();
       return end-start;
@@ -757,11 +784,11 @@ public class NNThroughputBenchmark implements Tool {
       }
       // use the same files for open
       super.generateInputs(opsPerThread);
-      if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
-          && nameNodeProto.getFileInfo(getBaseDir()) == null) {
-        nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
+      if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
+          && clientProto.getFileInfo(getBaseDir()) == null) {
+        clientProto.rename(opCreate.getBaseDir(), getBaseDir());
       }
-      if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
+      if(clientProto.getFileInfo(getBaseDir()) == null) {
         throw new IOException(getBaseDir() + " does not exist.");
       }
     }
@@ -773,7 +800,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, 
BLOCK_SIZE);
+      clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, 
BLOCK_SIZE);
       long end = Time.now();
       return end-start;
     }
@@ -803,7 +830,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
+      clientProto.delete(fileNames[daemonId][inputIdx], false);
       long end = Time.now();
       return end-start;
     }
@@ -833,7 +860,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
+      clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
     }
@@ -877,7 +904,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.rename(fileNames[daemonId][inputIdx],
+      clientProto.rename(fileNames[daemonId][inputIdx],
                       destNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
@@ -933,14 +960,14 @@ public class NNThroughputBenchmark implements Tool {
           new DataStorage(nsInfo),
           new ExportedBlockKeys(), VersionInfo.getVersion());
       // register datanode
-      dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
+      dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
+      dnRegistration.setNamespaceInfo(nsInfo);
       //first block reports
       storage = new DatanodeStorage(DatanodeStorage.generateUuid());
       final StorageBlockReport[] reports = {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
-      nameNodeProto.blockReport(dnRegistration, 
-          nameNode.getNamesystem().getBlockPoolId(), reports,
+      dataNodeProto.blockReport(dnRegistration, bpid, reports,
               new BlockReportContext(1, 0, System.nanoTime()));
     }
 
@@ -953,7 +980,7 @@ public class NNThroughputBenchmark implements Tool {
       // TODO:FEDERATION currently a single block pool is supported
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
@@ -1002,7 +1029,7 @@ public class NNThroughputBenchmark implements Tool {
       // register datanode
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
@@ -1041,8 +1068,7 @@ public class NNThroughputBenchmark implements Tool {
                   null) };
           StorageReceivedDeletedBlocks[] report = { new 
StorageReceivedDeletedBlocks(
               targetStorageID, rdBlocks) };
-          nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
-              .getNamesystem().getBlockPoolId(), report);
+          dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
         }
       }
       return blocks.length;
@@ -1133,15 +1159,15 @@ public class NNThroughputBenchmark implements Tool {
       FileNameGenerator nameGenerator;
       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
       String clientName = getClientName(007);
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
+        clientProto.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, 
CreateFlag.OVERWRITE)), true, replication,
-            BLOCK_SIZE, null);
+            BLOCK_SIZE, CryptoProtocolVersion.supported());
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
-        nameNodeProto.complete(fileName, clientName, lastBlock, 
HdfsConstants.GRANDFATHER_INODE_ID);
+        clientProto.complete(fileName, clientName, lastBlock, 
HdfsConstants.GRANDFATHER_INODE_ID);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -1153,7 +1179,7 @@ public class NNThroughputBenchmark implements Tool {
     throws IOException {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
+        LocatedBlock loc = clientProto.addBlock(fileName, clientName,
             prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
@@ -1164,8 +1190,8 @@ public class NNThroughputBenchmark implements Tool {
               ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
           StorageReceivedDeletedBlocks[] report = { new 
StorageReceivedDeletedBlocks(
               datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
-          
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
-              .getBlock().getBlockPoolId(), report);
+          
dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
+              bpid, report);
         }
       }
       return prevBlock;
@@ -1186,8 +1212,7 @@ public class NNThroughputBenchmark implements Tool {
       long start = Time.now();
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
-      nameNodeProto.blockReport(dn.dnRegistration,
-          nameNode.getNamesystem().getBlockPoolId(), report,
+      dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
           new BlockReportContext(1, 0, System.nanoTime()));
       long end = Time.now();
       return end-start;
@@ -1318,7 +1343,7 @@ public class NNThroughputBenchmark implements Tool {
         LOG.info("Datanode " + dn + " is decommissioned.");
       }
       excludeFile.close();
-      nameNodeProto.refreshNodes();
+      clientProto.refreshNodes();
     }
 
     /**
@@ -1414,8 +1439,6 @@ public class NNThroughputBenchmark implements Tool {
 
     // Start the NameNode
     String[] argv = new String[] {};
-    nameNode = NameNode.createNameNode(argv, config);
-    nameNodeProto = nameNode.getRpcServer();
 
     List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
     OperationStatsBase opStat = null;
@@ -1456,6 +1479,29 @@ public class NNThroughputBenchmark implements Tool {
         opStat = new CleanAllStats(args);
         ops.add(opStat);
       }
+
+      if (namenodeUri == null) {
+        nameNode = NameNode.createNameNode(argv, config);
+        NamenodeProtocols nnProtos = nameNode.getRpcServer();
+        nameNodeProto = nnProtos;
+        clientProto = nnProtos;
+        dataNodeProto = nnProtos;
+        refreshUserMappingsProto = nnProtos;
+        bpid = nameNode.getNamesystem().getBlockPoolId();
+      } else {
+        FileSystem.setDefaultUri(getConf(), namenodeUri);
+        DistributedFileSystem dfs = (DistributedFileSystem)
+            FileSystem.get(getConf());
+        final URI nnUri = new URI(namenodeUri);
+        nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
+            UserGroupInformation.getCurrentUser());
+        clientProto = dfs.getClient().getNamenode();
+        dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
+            NameNode.getAddress(nnUri), config);
+        refreshUserMappingsProto =
+            DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
+        getBlockPoolId(dfs);
+      }
       if(ops.size() == 0)
         printUsage();
       // run each benchmark
@@ -1476,6 +1522,12 @@ public class NNThroughputBenchmark implements Tool {
     return 0;
   }
 
+  private void getBlockPoolId(DistributedFileSystem unused)
+    throws IOException {
+    final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
+    bpid = nsInfo.getBlockPoolID();
+  }
+
   public static void main(String[] args) throws Exception {
     NNThroughputBenchmark bench = null;
     try {

Reply via email to