Author: szetszwo
Date: Mon Aug  6 12:03:13 2012
New Revision: 1369798

URL: http://svn.apache.org/viewvc?rev=1369798&view=rev
Log:
HDFS-385. Backport: Add support for an experimental API that allows a module 
external to HDFS to specify how HDFS blocks should be placed.  Contributed by 
Sumadhur Reddy Bolli

Added:
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1369798&r1=1369797&r2=1369798&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug  6 12:03:13 2012
@@ -23,6 +23,10 @@ Release 1.2.0 - unreleased
     HDFS-528. Backport: Add ability for safemode to wait for a minimum number
     of live datanodes.  (szetszwo)
 
+    HDFS-385. Backport: Add support for an experimental API that allows a
+    module external to HDFS to specify how HDFS blocks should be placed.
+    (Sumadhur Reddy Bolli via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1369798&r1=1369797&r2=1369798&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
 Mon Aug  6 12:03:13 2012
@@ -569,7 +569,7 @@ public class NetworkTopology {
    * @return number of available nodes
    */
   public int countNumOfAvailableNodes(String scope,
-                                      List<Node> excludedNodes) {
+                                      Collection<Node> excludedNodes) {
     boolean isExcluded=false;
     if (scope.startsWith("~")) {
       isExcluded=true;

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1369798&r1=1369797&r2=1369798&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 Mon Aug  6 12:03:13 2012
@@ -25,6 +25,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.Class;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -61,6 +62,9 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.FileSystem;
@@ -786,22 +790,38 @@ public class Balancer implements Tool {
       }
     }
   }
-  
+
+  /*
+   * Check that this Balancer is compatible with the Block Placement Policy 
used
+   * by the Namenode.
+   */
+  private void checkReplicationPolicyCompatibility(Configuration conf)
+      throws UnsupportedActionException {
+    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
BlockPlacementPolicyDefault.class) {
+      throw new UnsupportedActionException(
+          "Balancer without BlockPlacementPolicyDefault");
+    }
+  }
+
   /** Default constructor */
-  Balancer() {
+  Balancer() throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(getConf());
   }
-  
+
   /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) {
+  Balancer(Configuration conf) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
-  } 
+  }
 
   /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) {
+  Balancer(Configuration conf, double threshold)
+      throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
     this.threshold = threshold;
   }
-
+  
   /**
    * Run a balancer
    * @param args

Added: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1369798&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
 Mon Aug  6 12:03:13 2012
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.ReflectionUtils;
+import java.util.*;
+
+/** 
+ * This interface is used for choosing the desired number of targets
+ * for placing block replicas.
+ */
+@InterfaceAudience.Private
+public abstract class BlockPlacementPolicy {
+    
+  @InterfaceAudience.Private
+  public static class NotEnoughReplicasException extends Exception {
+    private static final long serialVersionUID = 1L;
+    NotEnoughReplicasException(String msg) {
+      super(msg);
+    }
+  }
+    
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath the file to which this chooseTargets is being invoked. 
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+                                             int numOfReplicas,
+                                             DatanodeDescriptor writer,
+                                             List<DatanodeDescriptor> 
chosenNodes,
+                                             long blocksize);
+
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   *
+   * @param srcPath the file to which this chooseTargets is being invoked.
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param excludedNodes: datanodes that should not be considered as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target
+   * and sorted as a pipeline.
+   */
+  public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+                                             int numOfReplicas,
+                                             DatanodeDescriptor writer,
+                                             List<DatanodeDescriptor> 
chosenNodes,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize);
+
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+   * If not, return as many as we can.
+   * The base implemenatation extracts the pathname of the file from the
+   * specified srcInode, but this could be a costly operation depending on the
+   * file system implementation. Concrete implementations of this class should
+   * override this method to avoid this overhead.
+   * 
+   * @param srcInode The inode of the file for which chooseTarget is being 
invoked.
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
+                        chosenNodes, blocksize);
+  }
+
+  /**
+   * Verify that the block is replicated on at least minRacks different racks
+   * if there is more than minRacks rack in the system.
+   * 
+   * @param srcPath the full pathname of the file to be verified
+   * @param lBlk block with locations
+   * @param minRacks number of racks the block should be replicated to
+   * @return the difference between the required and the actual number of racks
+   * the block is replicated to.
+   */
+  abstract public int verifyBlockPlacement(String srcPath,
+                                           LocatedBlock lBlk,
+                                           int minRacks);
+  /**
+   * Decide whether deleting the specified replica of the block still makes 
+   * the block conform to the configured block placement policy.
+   * 
+   * @param srcInode The inode of the file to which the block-to-be-deleted 
belongs
+   * @param block The block to be deleted
+   * @param replicationFactor The required number of replicas for this block
+   * @param existingReplicas The replica locations of this block that are 
present
+                  on at least two unique racks. 
+   * @param moreExistingReplicas Replica locations of this block that are not
+                   listed in the previous parameter.
+   * @return the replica that is the best candidate for deletion
+   */
+  abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo 
srcInode,
+                                      Block block, 
+                                      short replicationFactor,
+                                      Collection<DatanodeDescriptor> 
existingReplicas,
+                                      Collection<DatanodeDescriptor> 
moreExistingReplicas);
+
+  /**
+   * Used to setup a BlockPlacementPolicy object. This should be defined by 
+   * all implementations of a BlockPlacementPolicy.
+   * 
+   * @param conf the configuration object
+   * @param stats retrieve cluster status from here
+   * @param clusterMap cluster topology
+   */
+  abstract protected void initialize(Configuration conf,  FSClusterStats 
stats, 
+                                     NetworkTopology clusterMap);
+    
+  /**
+   * Get an instance of the configured Block Placement Policy based on the
+   * value of the configuration paramater dfs.block.replicator.classname.
+   * 
+   * @param conf the configuration to be used
+   * @param stats an object that is used to retrieve the load on the cluster
+   * @param clusterMap the network topology of the cluster
+   * @return an instance of BlockPlacementPolicy
+   */
+  public static BlockPlacementPolicy getInstance(Configuration conf, 
+                                                 FSClusterStats stats,
+                                                 NetworkTopology clusterMap) {
+    Class<? extends BlockPlacementPolicy> replicatorClass =
+                      conf.getClass("dfs.block.replicator.classname",
+                                    BlockPlacementPolicyDefault.class,
+                                    BlockPlacementPolicy.class);
+    BlockPlacementPolicy replicator = (BlockPlacementPolicy) 
ReflectionUtils.newInstance(
+                                                             replicatorClass, 
conf);
+    replicator.initialize(conf, stats, clusterMap);
+    return replicator;
+  }
+
+  /**
+   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath a string representation of the file for which chooseTarget 
is invoked
+   * @param numOfReplicas number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    long blocksize) {
+    return chooseTarget(srcPath, numOfReplicas, writer,
+                        new ArrayList<DatanodeDescriptor>(),
+                        blocksize);
+  }
+
+  /**
+   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i>
+   * If not, return as many as we can.
+   *
+   * @param srcPath a string representation of the file for which chooseTarget 
is invoked
+   * @param numOfReplicas number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param blocksize size of the data to be written.
+   * @param excludedNodes datanodes that should not be considered as targets.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    return chooseTarget(srcPath, numOfReplicas, writer,
+                        new ArrayList<DatanodeDescriptor>(),
+                        excludedNodes,
+                        blocksize);
+  }
+
+}
\ No newline at end of file

Added: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1369798&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
 Mon Aug  6 12:03:13 2012
@@ -0,0 +1,516 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine, 
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+  private boolean considerLoad; 
+  private NetworkTopology clusterMap;
+  private FSClusterStats stats;
+
+  BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
+                           NetworkTopology clusterMap) {
+    initialize(conf, stats, clusterMap);
+  }
+
+  BlockPlacementPolicyDefault() {
+  }
+    
+  /** {@inheritDoc} */
+  public void initialize(Configuration conf,  FSClusterStats stats,
+                         NetworkTopology clusterMap) {
+    this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+    this.stats = stats;
+    this.clusterMap = clusterMap;
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, 
blocksize);
+  }
+
+
+  /** {@inheritDoc} */
+  @Override
+  public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+    
+  /**
+   * This is not part of the public API but is used by the unit tests.
+   */
+  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return new DatanodeDescriptor[0];
+    }
+      
+    if (excludedNodes == null) {
+      excludedNodes = new HashMap<Node, Node>();
+    }
+     
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+      
+    int maxNodesPerRack = 
+      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+      
+    List<DatanodeDescriptor> results = 
+      new ArrayList<DatanodeDescriptor>(chosenNodes);
+    for (Node node:chosenNodes) {
+      excludedNodes.put(node, node);
+    }
+      
+    if (!clusterMap.contains(writer)) {
+      writer=null;
+    }
+      
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
+                                                excludedNodes, blocksize, 
maxNodesPerRack, results);
+      
+    results.removeAll(chosenNodes);
+      
+    // sorting nodes to form a pipeline
+    return getPipeline((writer==null)?localNode:writer,
+                       results.toArray(new 
DatanodeDescriptor[results.size()]));
+  }
+    
+  /* choose <i>numOfReplicas</i> from all data nodes */
+  private DatanodeDescriptor chooseTarget(int numOfReplicas,
+                                          DatanodeDescriptor writer,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) {
+      
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return writer;
+    }
+      
+    int numOfResults = results.size();
+    boolean newBlock = (numOfResults==0);
+    if (writer == null && !newBlock) {
+      writer = results.get(0);
+    }
+      
+    try {
+      if (numOfResults == 0) {
+        writer = chooseLocalNode(writer, excludedNodes, 
+                                 blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 1) {
+        chooseRemoteRack(1, results.get(0), excludedNodes, 
+                         blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 2) {
+        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+          chooseRemoteRack(1, results.get(0), excludedNodes,
+                           blocksize, maxNodesPerRack, results);
+        } else if (newBlock){
+          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
+                          maxNodesPerRack, results);
+        } else {
+          chooseLocalRack(writer, excludedNodes, blocksize,
+                          maxNodesPerRack, results);
+        }
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
+                   blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need 
of "
+               + numOfReplicas);
+    }
+    return writer;
+  }
+    
+  /* choose <i>localMachine</i> as the target.
+   * if <i>localMachine</i> is not available, 
+   * choose a node on the same rack
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalNode(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+      
+    // otherwise try local machine first
+    Node oldNode = excludedNodes.put(localMachine, localMachine);
+    if (oldNode == null) { // was not in the excluded list
+      if (isGoodTarget(localMachine, blocksize,
+                       maxNodesPerRack, false, results)) {
+        results.add(localMachine);
+        return localMachine;
+      }
+    } 
+      
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+                           blocksize, maxNodesPerRack, results);
+  }
+    
+  /* choose one node from the rack that <i>localMachine</i> is on.
+   * if no such node is available, choose one node from the rack where
+   * a second replica is on.
+   * if still no such node is available, choose a random node 
+   * in the cluster.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalRack(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+    }
+      
+    // choose one from the local rack
+    try {
+      return chooseRandom(
+                          localMachine.getNetworkLocation(),
+                          excludedNodes, blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+                              newLocal.getNetworkLocation(),
+                              excludedNodes, blocksize, maxNodesPerRack, 
results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+                              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+                            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+    
+  /* choose <i>numOfReplicas</i> nodes from the racks 
+   * that <i>localMachine</i> is NOT on.
+   * if not enough nodes are available, choose the remaining ones 
+   * from the local rack
+   */
+    
+  private void chooseRemoteRack(int numOfReplicas,
+                                DatanodeDescriptor localMachine,
+                                HashMap<Node, Node> excludedNodes,
+                                long blocksize,
+                                int maxReplicasPerRack,
+                                List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+                   excludedNodes, blocksize, maxReplicasPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+                   localMachine.getNetworkLocation(), excludedNodes, 
blocksize, 
+                   maxReplicasPerRack, results);
+    }
+  }
+
+  /* Randomly choose one target from <i>nodes</i>.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseRandom(
+                                          String nodes,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) 
+    throws NotEnoughReplicasException {
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) { // choosendNode was not in the excluded list
+        numOfAvailableNodes--;
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          results.add(chosenNode);
+          return chosenNode;
+        }
+      }
+    }
+
+    throw new NotEnoughReplicasException(
+        "Not able to place enough replicas");
+  }
+    
+  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+   */
+  private void chooseRandom(int numOfReplicas,
+                            String nodes,
+                            HashMap<Node, Node> excludedNodes,
+                            long blocksize,
+                            int maxNodesPerRack,
+                            List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+      
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) {
+        numOfAvailableNodes--;
+
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          numOfReplicas--;
+          results.add(chosenNode);
+        }
+      }
+    }
+      
+    if (numOfReplicas>0) {
+      throw new NotEnoughReplicasException(
+                                           "Not able to place enough 
replicas");
+    }
+  }
+    
+  /* judge if a node is a good target.
+   * return true if <i>node</i> has enough space, 
+   * does not have too much load, and the rack does not have too many nodes
+   */
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                        this.considerLoad, results);
+  }
+    
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               boolean considerLoad,
+                               List<DatanodeDescriptor> results) {
+    Log logr = FSNamesystem.LOG;
+    // check if the node is (being) decommissed
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node is (being) decommissioned");
+      return false;
+    }
+
+    long remaining = node.getRemaining() - 
+                     (node.getBlocksScheduled() * blockSize); 
+    // check the remaining capacity of the target machine
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node does not have enough space");
+      return false;
+    }
+      
+    // check the communication traffic of the target machine
+    if (considerLoad) {
+      double avgLoad = 0;
+      int size = clusterMap.getNumOfLeaves();
+      if (size != 0 && stats != null) {
+        avgLoad = (double)stats.getTotalLoad()/size;
+      }
+      if (node.getXceiverCount() > (2.0 * avgLoad)) {
+        logr.debug("Node "+NodeBase.getPath(node)+
+                  " is not chosen because the node is too busy");
+        return false;
+      }
+    }
+      
+    // check if the target rack has chosen too many nodes
+    String rackname = node.getNetworkLocation();
+    int counter=1;
+    for(Iterator<DatanodeDescriptor> iter = results.iterator();
+        iter.hasNext();) {
+      Node result = iter.next();
+      if (rackname.equals(result.getNetworkLocation())) {
+        counter++;
+      }
+    }
+    if (counter>maxTargetPerLoc) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the rack has too many chosen nodes");
+      return false;
+    }
+    return true;
+  }
+    
+  /* Return a pipeline of nodes.
+   * The pipeline is formed finding a shortest path that 
+   * starts from the writer and traverses all <i>nodes</i>
+   * This is basically a traveling salesman problem.
+   */
+  private DatanodeDescriptor[] getPipeline(
+                                           DatanodeDescriptor writer,
+                                           DatanodeDescriptor[] nodes) {
+    if (nodes.length==0) return nodes;
+      
+    synchronized(clusterMap) {
+      int index=0;
+      if (writer == null || !clusterMap.contains(writer)) {
+        writer = nodes[0];
+      }
+      for(;index<nodes.length; index++) {
+        DatanodeDescriptor shortestNode = nodes[index];
+        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+        int shortestIndex = index;
+        for(int i=index+1; i<nodes.length; i++) {
+          DatanodeDescriptor currentNode = nodes[i];
+          int currentDistance = clusterMap.getDistance(writer, currentNode);
+          if (shortestDistance>currentDistance) {
+            shortestDistance = currentDistance;
+            shortestNode = currentNode;
+            shortestIndex = i;
+          }
+        }
+        //switch position index & shortestIndex
+        if (index != shortestIndex) {
+          nodes[shortestIndex] = nodes[index];
+          nodes[index] = shortestNode;
+        }
+        writer = shortestNode;
+      }
+    }
+    return nodes;
+  }
+
+  /** {@inheritDoc} */
+  public int verifyBlockPlacement(String srcPath,
+                                  LocatedBlock lBlk,
+                                  int minRacks) {
+    DatanodeInfo[] locs = lBlk.getLocations();
+    if (locs == null)
+      locs = new DatanodeInfo[0];
+    int numRacks = clusterMap.getNumOfRacks();
+    if(numRacks <= 1) // only one rack
+      return 0;
+    minRacks = Math.min(minRacks, numRacks);
+    // 1. Check that all locations are different.
+    // 2. Count locations on different racks.
+    Set<String> racks = new TreeSet<String>();
+    for (DatanodeInfo dn : locs)
+      racks.add(dn.getNetworkLocation());
+    return minRacks - racks.size();
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+                                                 Block block,
+                                                 short replicationFactor,
+                                                 
Collection<DatanodeDescriptor> first, 
+                                                 
Collection<DatanodeDescriptor> second) {
+    long minSpace = Long.MAX_VALUE;
+    DatanodeDescriptor cur = null;
+
+    // pick replica from the first Set. If first is empty, then pick replicas
+    // from second set.
+    Iterator<DatanodeDescriptor> iter =
+          first.isEmpty() ? second.iterator() : first.iterator();
+
+    // pick node with least free space
+    while (iter.hasNext() ) {
+      DatanodeDescriptor node = iter.next();
+      long free = node.getRemaining();
+      if (minSpace > free) {
+        minSpace = free;
+        cur = node;
+      }
+    }
+    return cur;
+  }
+}
+

Added: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1369798&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
 Mon Aug  6 12:03:13 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used for retrieving the load related statistics of 
+ * the cluster.
+ */
+public interface FSClusterStats {
+
+  /**
+   * an indication of the total load of the cluster.
+   * 
+   * @return a count of the total number of block transfers and block
+   *         writes that are currently occuring on the cluster.
+   */
+
+  public int getTotalLoad() ;
+}
+    
\ No newline at end of file

Added: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java?rev=1369798&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
 Mon Aug  6 12:03:13 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used used the pluggable block placement policy
+ * to expose a few characteristics of an Inode.
+ */
+public interface FSInodeInfo {
+
+  /**
+   * a string representation of an inode
+   * 
+   * @return the full pathname (from root) that this inode represents
+   */
+
+  public String getFullPathName() ;
+}
+    

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1369798&r1=1369797&r2=1369798&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Mon Aug  6 12:03:13 2012
@@ -116,6 +116,7 @@ import org.apache.hadoop.net.CachedDNSTo
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -143,7 +144,7 @@ import org.mortbay.util.ajax.JSON;
  * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean,
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, 
FSClusterStats, 
     NameNodeMXBean, MetricsSource {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
   public static final String AUDIT_FORMAT =
@@ -341,7 +342,7 @@ public class FSNamesystem implements FSC
   private DNSToSwitchMapping dnsToSwitchMapping;
   
   // for block replicas placement
-  ReplicationTargetChooser replicator;
+  BlockPlacementPolicy replicator;
 
   private HostsFileReader hostsReader; 
   private Daemon dnthread = null;
@@ -499,11 +500,7 @@ public class FSNamesystem implements FSC
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new 
FsPermission(filePermission));
 
-
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
-                         this,
-                         clusterMap);
+    this.replicator = BlockPlacementPolicy.getInstance(conf, this, clusterMap);
     this.defaultReplication = conf.getInt("dfs.replication", 3);
     this.maxReplication = conf.getInt("dfs.replication.max", 512);
     this.minReplication = conf.getInt("dfs.replication.min", 1);
@@ -1559,7 +1556,7 @@ public class FSNamesystem implements FSC
    */
   public LocatedBlock getAdditionalBlock(String src, 
                                          String clientName,
-                                         List<Node> excludedNodes
+                                         HashMap<Node, Node> excludedNodes
                                          ) throws IOException {
     long fileLength, blockSize;
     int replication;
@@ -1588,7 +1585,8 @@ public class FSNamesystem implements FSC
     }
 
     // choose targets for the new block tobe allocated.
-    DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+    DatanodeDescriptor targets[] = replicator.chooseTarget(src, 
+                                                           replication,
                                                            clientNode,
                                                            excludedNodes,
                                                            blockSize);
@@ -3020,10 +3018,11 @@ public class FSNamesystem implements FSC
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
     
+    INodeFile fileINode = null;
     synchronized (this) {
       synchronized (neededReplications) {
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) { 
           neededReplications.remove(block, priority); // remove from 
neededReplications
@@ -3058,9 +3057,11 @@ public class FSNamesystem implements FSC
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
-    DatanodeDescriptor targets[] = replicator.chooseTarget(
+    // It is costly to extract the filename for which chooseTargets is called,
+    // so for now we pass in the Inode itself.
+    DatanodeDescriptor targets[] = replicator.chooseTarget(fileINode,
         requiredReplication - numEffectiveReplicas,
-        srcNode, containingNodes, null, block.getNumBytes());
+        srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -3068,7 +3069,7 @@ public class FSNamesystem implements FSC
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) { 
           neededReplications.remove(block, priority); // remove from 
neededReplications
@@ -3131,12 +3132,13 @@ public class FSNamesystem implements FSC
   }
 
   /** Choose a datanode near to the given address. */ 
-  public DatanodeInfo chooseDatanode(String address, long blocksize) {
+  public DatanodeInfo chooseDatanode(String srcPath, String address, long 
blocksize) {
     final DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
         address);
     if (clientNode != null) {
+      HashMap<Node,Node> excludedNodes = null;
       final DatanodeDescriptor[] datanodes = replicator.chooseTarget(
-          1, clientNode, null, blocksize);
+          srcPath, 1, clientNode, excludedNodes, blocksize);
       if (datanodes.length > 0) {
         return datanodes[0];
       }
@@ -3925,6 +3927,7 @@ public class FSNamesystem implements FSC
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint) {
+    INodeFile inode = blocksMap.getINode(b);
     // first form a rack to datanodes map and
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
@@ -3962,24 +3965,13 @@ public class FSNamesystem implements FSC
     boolean firstOne = true;
     while (nonExcess.size() - replication > 0) {
       DatanodeInfo cur = null;
-      long minSpace = Long.MAX_VALUE;
 
       // check if we can del delNodeHint
       if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
             (priSet.contains(delNodeHint) || (addedNode != null && 
!priSet.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        Iterator<DatanodeDescriptor> iter = 
-          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-          while( iter.hasNext() ) {
-            DatanodeDescriptor node = iter.next();
-            long free = node.getRemaining();
-
-            if (minSpace > free) {
-              minSpace = free;
-              cur = node;
-            }
-          }
+        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, 
remains);          
       }
 
       firstOne = false;
@@ -4716,7 +4708,7 @@ public class FSNamesystem implements FSC
   }
 
   public DatanodeDescriptor getRandomDatanode() {
-    return replicator.chooseTarget(1, null, null, 0)[0];
+    return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT);
   }
 
   /**


Reply via email to