HDFS-6940. Refactoring to allow ConsensusNode implementation.
Contributed by Konstantin Shvachko.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88209ce1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88209ce1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88209ce1

Branch: refs/heads/HDFS-6584
Commit: 88209ce181b5ecc55c0ae2bceff4893ab4817e88
Parents: 3b35f81
Author: Konstantin V Shvachko <s...@apache.org>
Authored: Sat Sep 6 12:07:52 2014 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Sat Sep 6 12:07:52 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  2 +
 .../server/blockmanagement/BlockManager.java    | 23 ++++++++--
 .../server/blockmanagement/DatanodeManager.java |  6 ++-
 .../server/blockmanagement/HostFileManager.java |  4 ++
 .../hdfs/server/namenode/FSNamesystem.java      | 46 +++++++++++---------
 .../hdfs/server/namenode/NameNodeAdapter.java   |  2 +-
 6 files changed, 57 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 333bdce..4412b30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -444,6 +444,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-6376. Distcp data between two HA clusters requires another 
configuration.
     (Dave Marion and Haohui Mai via jing9)
 
+    HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8470680..6176188 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -164,7 +164,7 @@ public class BlockManager {
   final BlocksMap blocksMap;
 
   /** Replication thread. */
-  final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+  Daemon replicationThread;
   
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@@ -263,6 +263,7 @@ public class BlockManager {
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
+    setReplicationMonitor(new ReplicationMonitor());
 
     final long pendingPeriod = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
@@ -394,7 +395,23 @@ public class BlockManager {
           lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
     }
   }
-  
+
+  public long getReplicationRecheckInterval() {
+    return replicationRecheckInterval;
+  }
+
+  public AtomicLong excessBlocksCount() {
+    return excessBlocksCount;
+  }
+
+  public void clearInvalidateBlocks() {
+    invalidateBlocks.clear();
+  }
+
+  void setReplicationMonitor(Runnable replicationMonitor) {
+    replicationThread = new Daemon(replicationMonitor);
+  }
+
   public void setBlockPoolId(String blockPoolId) {
     if (isBlockTokenEnabled()) {
       blockTokenSecretManager.setBlockPoolId(blockPoolId);
@@ -1616,7 +1633,7 @@ public class BlockManager {
    * If there were any replication requests that timed out, reap them
    * and put them back into the neededReplication queue
    */
-  private void processPendingReplications() {
+  void processPendingReplications() {
     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
       namesystem.writeLock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 709f060..55d616f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1053,7 +1053,7 @@ public class DatanodeManager {
    * 3. Added to exclude --> start decommission.
    * 4. Removed from exclude --> stop decommission.
    */
-  private void refreshDatanodes() {
+  void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!hostFileManager.isIncluded(node)) {
@@ -1586,5 +1586,9 @@ public class DatanodeManager {
   public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
     this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
+
+  public HostFileManager getHostFileManager() {
+    return this.hostFileManager;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
index 0b8d6c5..7db23e4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java
@@ -129,6 +129,10 @@ class HostFileManager {
   void refresh(String includeFile, String excludeFile) throws IOException {
     HostSet newIncludes = readFile("included", includeFile);
     HostSet newExcludes = readFile("excluded", excludeFile);
+    setHosts(newIncludes, newExcludes);
+  }
+
+  void setHosts(HostSet newIncludes, HostSet newExcludes) {
     synchronized (this) {
       includes = newIncludes;
       excludes = newExcludes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index c1744f6..a6b98a5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -978,7 +978,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     return Collections.unmodifiableList(auditLoggers);
   }
 
-  private void loadFSImage(StartupOption startOpt) throws IOException {
+  protected void loadFSImage(StartupOption startOpt) throws IOException {
     final FSImage fsImage = getFSImage();
 
     // format before starting up if requested
@@ -1026,7 +1026,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     imageLoadComplete();
   }
 
-  private void startSecretManager() {
+  protected void startSecretManager() {
     if (dtSecretManager != null) {
       try {
         dtSecretManager.startThreads();
@@ -1038,7 +1038,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     }
   }
   
-  private void startSecretManagerIfNecessary() {
+  protected void startSecretManagerIfNecessary() {
     boolean shouldRun = shouldUseDelegationTokens() &&
       !isInSafeMode() && getEditLog().isOpenForWrite();
     boolean running = dtSecretManager.isRunning();
@@ -1188,7 +1188,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     return haEnabled && inActiveState() && startingActiveService;
   }
 
-  private boolean shouldUseDelegationTokens() {
+  protected boolean shouldUseDelegationTokens() {
     return UserGroupInformation.isSecurityEnabled() ||
       alwaysUseDelegationTokensForTests;
   }
@@ -2729,6 +2729,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
    * @throws UnresolvedLinkException
    * @throws IOException
    */
+  protected
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
                                    String leaseHolder, String clientMachine,
                                    boolean writeToEditLog,
@@ -3185,6 +3186,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     return new FileState(pendingFile, src);
   }
 
+  protected
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
                                         long offset) throws IOException {
     LocatedBlock lBlk = new LocatedBlock(
@@ -3302,8 +3304,8 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     return true;
   }
 
-  private INodeFile checkLease(String src, String holder, INode inode,
-                               long fileId)
+  protected INodeFile checkLease(String src, String holder, INode inode,
+                                 long fileId)
       throws LeaseExpiredException, FileNotFoundException {
     assert hasReadLock();
     final String ident = src + " (inode " + fileId + ")";
@@ -4420,7 +4422,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     return leaseManager.reassignLease(lease, src, newHolder);
   }
 
-  private void commitOrCompleteLastBlock(final INodeFile fileINode,
+  protected void commitOrCompleteLastBlock(final INodeFile fileINode,
       final Block commitBlock) throws IOException {
     assert hasWriteLock();
     Preconditions.checkArgument(fileINode.isUnderConstruction());
@@ -4816,6 +4818,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
    * @return an array of datanode commands 
    * @throws IOException
    */
+  protected
   HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes)
@@ -4865,8 +4868,8 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
    * @param file
    * @param logRetryCache
    */
-  private void persistBlocks(String path, INodeFile file,
-                             boolean logRetryCache) {
+  protected void persistBlocks(String path, INodeFile file,
+                               boolean logRetryCache) {
     assert hasWriteLock();
     Preconditions.checkArgument(file.isUnderConstruction());
     getEditLog().logUpdateBlocks(path, file, logRetryCache);
@@ -5297,7 +5300,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
    * @param path
    * @param file
    */
-  private void persistNewBlock(String path, INodeFile file) {
+  protected void persistNewBlock(String path, INodeFile file) {
     Preconditions.checkArgument(file.isUnderConstruction());
     getEditLog().logAddBlock(path, file);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -7175,7 +7178,7 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
    * 
    * @return true if delegation token operation is allowed
    */
-  private boolean isAllowedDelegationTokenOp() throws IOException {
+  protected boolean isAllowedDelegationTokenOp() throws IOException {
     AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
     if (UserGroupInformation.isSecurityEnabled()
         && (authMethod != AuthenticationMethod.KERBEROS)
@@ -7342,7 +7345,13 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
     for (DatanodeDescriptor node : live) {
-      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
+      info.put(node.getHostName(), getLiveNodeInfo(node));
+    }
+    return JSON.toString(info);
+  }
+
+  protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
+    return ImmutableMap.<String, Object>builder()
           .put("infoAddr", node.getInfoAddr())
           .put("infoSecureAddr", node.getInfoSecureAddr())
           .put("xferaddr", node.getXferAddr())
@@ -7360,10 +7369,6 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
           .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
           .put("volfails", node.getVolumeFailures())
           .build();
-
-      info.put(node.getHostName(), innerinfo);
-    }
-    return JSON.toString(info);
   }
 
   /**
@@ -7648,17 +7653,16 @@ public class FSNamesystem implements Namesystem, 
FSClusterStats,
   public ReentrantLock getLongReadLockForTests() {
     return fsLock.longReadLock;
   }
-
-  @VisibleForTesting
-  public SafeModeInfo getSafeModeInfoForTests() {
-    return safeMode;
-  }
   
   @VisibleForTesting
   public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
     this.nnResourceChecker = nnResourceChecker;
   }
 
+  public SafeModeInfo getSafeModeInfo() {
+    return safeMode;
+  }
+
   @Override
   public boolean isAvoidingStaleDataNodesForWrite() {
     return this.blockManager.getDatanodeManager()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index c32ed67..d65d1ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -223,7 +223,7 @@ public class NameNodeAdapter {
    * if safemode is not running.
    */
   public static int getSafeModeSafeBlocks(NameNode nn) {
-    SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
+    SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo();
     if (smi == null) {
       return -1;
     }

Reply via email to