Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=357197&r1=357196&r2=357197&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDataset.java Fri Dec 16 
09:51:05 2005
@@ -29,7 +29,8 @@
  ***************************************************/
 public class FSDataset implements FSConstants {
     static final double USABLE_DISK_PCT = 0.98;
-    /**
+
+  /**
      * A node type that can be built into a tree reflecting the
      * hierarchy of blocks on the local disk.
      */
@@ -166,6 +167,13 @@
             blkid = blkid >> ((15 - halfByteIndex) * 4);
             return (int) ((0x000000000000000F) & blkid);
         }
+
+        public String toString() {
+          return "FSDir{" +
+              "dir=" + dir +
+              ", children=" + (children == null ? null : 
Arrays.asList(children)) +
+              "}";
+        }
     }
 
     //////////////////////////////////////////////////////
@@ -282,17 +290,23 @@
             ongoingCreates.add(b);
             reserved += BLOCK_SIZE;
             f = getTmpFile(b);
-
-            if (f.exists()) {
-                throw new IOException("Unexpected problem in startBlock() for 
" + b + ".  File " + f + " should not be present, but is.");
-            }
-        }
-
-        //
-        // Create the zero-length temp file
-        //
-        if (!f.createNewFile()) {
-            throw new IOException("Unexpected problem in startBlock() for " + 
b + ".  File " + f + " should be creatable, but is already present.");
+           try {
+               if (f.exists()) {
+                   throw new IOException("Unexpected problem in startBlock() 
for " + b + ".  File " + f + " should not be present, but is.");
+               }
+
+               //
+               // Create the zero-length temp file
+               //
+               if (!f.createNewFile()) {
+                   throw new IOException("Unexpected problem in startBlock() 
for " + b + ".  File " + f + " should be creatable, but is already present.");
+               }
+           } catch (IOException ie) {
+                System.out.println("Exception!  " + ie);
+               ongoingCreates.remove(b);               
+               reserved -= BLOCK_SIZE;
+                throw ie;
+           }
         }
 
         //
@@ -405,4 +419,11 @@
         // REMIND - mjc - should cache this result for performance
         return new File(tmp, b.getBlockName());
     }
+
+    public String toString() {
+      return "FSDataset{" +
+        "dirpath='" + dirpath + "'" +
+        "}";
+    }
+
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDirectory.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDirectory.java?rev=357197&r1=357196&r2=357197&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDirectory.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSDirectory.java Fri Dec 
16 09:51:05 2005
@@ -47,7 +47,7 @@
     class INode {
         public String name;
         public INode parent;
-        public Vector children = new Vector();
+        public TreeMap children = new TreeMap();
         public Block blocks[];
 
         /**
@@ -59,61 +59,45 @@
         }
 
         /**
+         * This is the external interface
          */
         INode getNode(String target) {
-            if (! target.startsWith("/")) {
+            if (! target.startsWith("/") || target.length() == 0) {
                 return null;
-            }
-
-            if (parent == null) {
-                if ("/".equals(target)) {
-                    return this;
-                } else {
-                    // Check with children
-                    for (Iterator it = children.iterator(); it.hasNext(); ) {
-                        INode child = (INode) it.next();
-                        INode result = child.getNode(target);
-                        if (result != null) {
-                            return result;
-                        }
-                    }
-                }
+            } else if (parent == null && "/".equals(target)) {
+                return this;
             } else {
-                // Strip the leading slash
-                if (target.length() > 1) {
-                    target = target.substring(1);
-                }
-
-                // Check if it's the current node
-                if (name.equals(target)) {
-                    return this;
+                Vector components = new Vector();
+                int start = 0;
+                int slashid = 0;
+                while (start < target.length() && (slashid = 
target.indexOf('/', start)) >= 0) {
+                    components.add(target.substring(start, slashid));
+                    start = slashid + 1;
                 }
-
-                // Get the chunk up to the next slash
-                String curComponent, remainder;
-                int slash = target.indexOf('/');
-                if (slash < 0) {
-                    return null;
-                } else {
-                    curComponent = target.substring(0, slash);
-                    remainder = target.substring(slash);
+                if (start < target.length()) {
+                    components.add(target.substring(start));
                 }
+                return getNode(components, 0);
+            }
+        }
 
-                // Make sure we're on the right track
-                if (! name.equals(curComponent)) {
-                    return null;
-                } 
+        /**
+         */
+        INode getNode(Vector components, int index) {
+            if (! name.equals((String) components.elementAt(index))) {
+                return null;
+            }
+            if (index == components.size()-1) {
+                return this;
+            }
 
-                // Check with children
-                for (Iterator it = children.iterator(); it.hasNext(); ) {
-                    INode child = (INode) it.next();
-                    INode result = child.getNode(remainder);
-                    if (result != null) {
-                        return result;
-                    }
-                }
+            // Check with children
+            INode child = (INode) children.get(components.elementAt(index+1));
+            if (child == null) {
+                return null;
+            } else {
+                return child.getNode(components, index+1);
             }
-            return null;
         }
 
         /**
@@ -133,7 +117,7 @@
                 } else {
                     String targetName = new File(target).getName();
                     INode newItem = new INode(targetName, parentNode, blks);
-                    parentNode.children.add(newItem);
+                    parentNode.children.put(targetName, newItem);
                     return newItem;
                 }
             }
@@ -141,13 +125,29 @@
 
         /**
          */
-        INode removeNode(String target) {
-            INode targetNode = getNode(target);
-            if (targetNode == null) {
-                return null;
+        boolean removeNode() {
+            if (parent == null) {
+                return false;
             } else {
-                targetNode.parent.children.remove(targetNode);
-                return targetNode;
+                parent.children.remove(name);
+                return true;
+            }
+        }
+
+        /**
+         * Collect all the blocks at this INode and all its children.
+         * This operation is performed after a node is removed from the tree,
+         * and we want to GC all the blocks at this node and below.
+         */
+        void collectSubtreeBlocks(Vector v) {
+            if (blocks != null) {
+                for (int i = 0; i < blocks.length; i++) {
+                    v.add(blocks[i]);
+                }
+            }
+            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
+                INode child = (INode) it.next();
+                child.collectSubtreeBlocks(v);
             }
         }
 
@@ -155,7 +155,7 @@
          */
         int numItemsInTree() {
             int total = 0;
-            for (Iterator it = children.iterator(); it.hasNext(); ) {
+            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 total += child.numItemsInTree();
             }
@@ -188,7 +188,7 @@
          */
         long computeContentsLength() {
             long total = computeFileLength();
-            for (Iterator it = children.iterator(); it.hasNext(); ) {
+            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 total += child.computeContentsLength();
             }
@@ -202,7 +202,7 @@
                 v.add(this);
             }
 
-            for (Iterator it = children.iterator(); it.hasNext(); ) {
+            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 v.add(child);
             }
@@ -224,7 +224,7 @@
                     }
                 }
             }
-            for (Iterator it = children.iterator(); it.hasNext(); ) {
+            for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 child.saveImage(fullName, out);
             }
@@ -502,21 +502,24 @@
      */
     boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {
         synchronized(rootDir) {
-            INode removedNode = rootDir.removeNode(src.toString());
+            INode removedNode = rootDir.getNode(src.toString());
             if (removedNode == null) {
                 return false;
             }
-
+            removedNode.removeNode();
+            if (isDir(dst)) {
+                dst = new UTF8(dst.toString() + "/" + new 
File(src.toString()).getName());
+            }
             INode newNode = rootDir.addNode(dst.toString(), 
removedNode.blocks);
             if (newNode != null) {
                 newNode.children = removedNode.children;
-                for (Iterator it = newNode.children.iterator(); it.hasNext(); 
) {
+                for (Iterator it = newNode.children.values().iterator(); 
it.hasNext(); ) {
                     INode child = (INode) it.next();
                     child.parent = newNode;
                 }
                 return true;
             } else {
-                removedNode.parent.children.add(removedNode);
+                rootDir.addNode(src.toString(), removedNode.blocks);
                 return false;
             }
         }
@@ -539,25 +542,21 @@
             if (targetNode == null) {
                 return null;
             } else {
-                Vector allBlocks = new Vector();
-                Vector contents = new Vector();
-                targetNode.listContents(contents);
-
-                for (Iterator it = contents.iterator(); it.hasNext(); ) {
-                    INode cur = (INode) it.next();
-                    INode removedNode = rootDir.removeNode(cur.computeName());
-                    if (removedNode != null) {
-                        Block blocks[] = removedNode.blocks;
-                        if (blocks != null) {
-                            for (int i = 0; i < blocks.length; i++) {
-                                activeBlocks.remove(blocks[i]);
-                                allBlocks.add(blocks[i]);
-                            }
-                        }
+                //
+                // Remove the node from the namespace and GC all
+                // the blocks underneath the node.
+                //
+                if (! targetNode.removeNode()) {
+                    return null;
+                } else {
+                    Vector v = new Vector();
+                    targetNode.collectSubtreeBlocks(v);
+                    for (Iterator it = v.iterator(); it.hasNext(); ) {
+                        Block b = (Block) it.next();
+                        activeBlocks.remove(b);
                     }
+                    return (Block[]) v.toArray(new Block[v.size()]);
                 }
-                rootDir.removeNode(src.toString());
-                return (Block[]) allBlocks.toArray(new Block[0]);
             }
         }
     }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=357197&r1=357196&r2=357197&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Dec 
16 09:51:05 2005
@@ -38,12 +38,24 @@
     final static int DESIRED_REPLICATION =
       NutchConf.get().getInt("ndfs.replication", 3);
 
+    // The maximum number of replicates we should allow for a single block
+    final static int MAX_REPLICATION = DESIRED_REPLICATION;
+
+    // How many outgoing replication streams a given node should have at one 
time
+    final static int MAX_REPLICATION_STREAMS = 
NutchConf.get().getInt("ndfs.max-repl-streams", 2);
+
     // MIN_REPLICATION is how many copies we need in place or else we disallow 
the write
     final static int MIN_REPLICATION = 1;
 
     // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
     final static long HEARTBEAT_RECHECK = 1000;
 
+    // Whether we should use disk-availability info when determining target
+    final static boolean USE_AVAILABILITY = 
NutchConf.get().getBoolean("ndfs.availability.allocation", false);
+
+    private boolean allowSameHostTargets =
+        NutchConf.get().getBoolean("test.ndfs.same.host.targets.allowed", 
false);
+
     //
     // Stores the correct file name hierarchy
     //
@@ -70,6 +82,13 @@
     TreeMap recentInvalidateSets = new TreeMap();
 
     //
+    // Keeps a TreeSet for every named node.  Each treeset contains
+    // a list of the blocks that are "extra" at that location.  We'll
+    // eventually remove these extras.
+    //
+    TreeMap excessReplicateMap = new TreeMap();
+
+    //
     // Keeps track of files that are being created, plus the
     // blocks that make them up.
     //
@@ -111,14 +130,14 @@
     // Store set of Blocks that need to be replicated 1 or more times.
     // We also store pending replication-orders.
     //
-    TreeSet neededReplications = new TreeSet();
-    TreeSet pendingReplications = new TreeSet();
+    private TreeSet neededReplications = new TreeSet();
+    private TreeSet pendingReplications = new TreeSet();
 
     //
     // Used for handling lock-leases
     //
-    TreeMap leases = new TreeMap();
-    TreeSet sortedLeases = new TreeSet();
+    private TreeMap leases = new TreeMap();
+    private TreeSet sortedLeases = new TreeSet();
 
     //
     // Threaded object that checks to see if we have been
@@ -143,17 +162,23 @@
         this.systemStart = System.currentTimeMillis();
     }
 
-    /**
+    /** Close down this filesystem manager.
+     * Causes heartbeat and lease daemons to stop; waits briefly for
+     * them to finish, but a short timeout returns control back to caller.
      */
     public void close() {
+      synchronized (this) {
         fsRunning = false;
+      }
         try {
-            hbthread.join();
-        } catch (InterruptedException ie) {
-        }
-        try {
-            lmthread.join();
+            hbthread.join(3000);
         } catch (InterruptedException ie) {
+        } finally {
+          // using finally to ensure we also wait for lease daemon
+          try {
+            lmthread.join(3000);
+          } catch (InterruptedException ie) {
+          }
         }
     }
 
@@ -202,6 +227,9 @@
      * of machines.  The first on this list should be where the client 
      * writes data.  Subsequent items in the list must be provided in
      * the connection to the first datanode.
+     * @return Return an array that consists of the block, plus a set
+     * of machines, or null if src is invalid for creation (based on
+     * [EMAIL PROTECTED] FSDirectory#isValidToCreate(UTF8)}.
      */
     public synchronized Object[] startFile(UTF8 src, UTF8 holder, boolean 
overwrite) {
         Object results[] = null;
@@ -218,6 +246,8 @@
                 // Get the array of replication targets 
                 DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, 
null);
                 if (targets.length < MIN_REPLICATION) {
+                    LOG.warning("Target-length is " + targets.length +
+                        ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
                     return null;
                 }
 
@@ -240,7 +270,11 @@
                 // Create next block
                 results[0] = allocateBlock(src);
                 results[1] = targets;
+            } else { // ! fileValid
+              LOG.warning("Cannot start file because it is invalid. src=" + 
src);
             }
+        } else {
+            LOG.warning("Cannot start file because pendingCreates is non-null. 
src=" + src);
         }
         return results;
     }
@@ -274,16 +308,6 @@
                 // Create next block
                 results[0] = allocateBlock(src);
                 results[1] = targets;
-            } else {
-                LOG.info("File progress failure for " + src);
-                Vector v = (Vector) pendingCreates.get(src);
-                for (Iterator it = v.iterator(); it.hasNext(); ) {
-                    Block b = (Block) it.next();
-                    TreeSet containingNodes = (TreeSet) blocksMap.get(b);
-                    if (containingNodes == null || containingNodes.size() < 
MIN_REPLICATION) {
-                        LOG.info("Problem with block " + b + ", with " + 
(containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes 
reporting in.");
-                    }
-                }
             }
         }
         return results;
@@ -311,6 +335,13 @@
     }
 
     /**
+     * Abandon the entire file in progress
+     */
+    public synchronized void abandonFileInProgress(UTF8 src) throws 
IOException {
+        internalReleaseCreate(src);
+    }
+
+    /**
      * Finalize the created file and make it world-accessible.  The
      * FSNamesystem will already know the blocks that make up the file.
      * Before we return, we make sure that all the file's blocks have 
@@ -318,6 +349,7 @@
      */
     public synchronized int completeFile(UTF8 src, UTF8 holder) {
         if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
+           LOG.info("Failed to complete " + src + "  because dir.getFile()==" 
+ dir.getFile(src) + " and " + pendingCreates.get(src));
             return OPERATION_FAILED;
         } else if (! checkFileProgress(src)) {
             return STILL_WAITING;
@@ -368,18 +400,29 @@
                     }
                 }
 
+                //
+                // REMIND - mjc - this should be done only after we wait a few 
secs.
+                // The namenode isn't giving datanodes enough time to report 
the
+                // replicated blocks that are automatically done as part of a 
client
+                // write.
+                //
+
                 // Now that the file is real, we need to be sure to replicate
                 // the blocks.
                 for (int i = 0; i < pendingBlocks.length; i++) {
                     TreeSet containingNodes = (TreeSet) 
blocksMap.get(pendingBlocks[i]);
                     if (containingNodes.size() < DESIRED_REPLICATION) {
                         synchronized (neededReplications) {
+                            LOG.info("Completed file " + src + ", at holder " 
+ holder + ".  There is/are only " + containingNodes.size() + " copies of block 
" + pendingBlocks[i] + ", so replicating up to " + DESIRED_REPLICATION);
                             neededReplications.add(pendingBlocks[i]);
                         }
                     }
                 }
                 return COMPLETE_SUCCESS;
+            } else {
+                System.out.println("AddFile() for " + src + " failed");
             }
+           LOG.info("Dropped through on file add....");
         }
 
         return OPERATION_FAILED;
@@ -486,39 +529,58 @@
 
     /**
      * Figure out a few hosts that are likely to contain the
-     * block referred to by the given filename, offset pair.
+     * block(s) referred to by the given (filename, start, len) tuple.
      */
-    public UTF8[] getDatanodeHints(UTF8 src, long offset) {
-        Block targetBlock = null;
+    public UTF8[][] getDatanodeHints(UTF8 src, long start, long len) {
+        if (start < 0 || len < 0) {
+            return new UTF8[0][];
+        }
+
+        int startBlock = -1;
+        int endBlock = -1;
         Block blocks[] = dir.getFile(src);
 
         //
-        // First, figure out where the offset would fall in
+        // First, figure out where the range falls in
         // the blocklist.
         //
+        long startpos = start;
+        long endpos = start + len;
         for (int i = 0; i < blocks.length; i++) {
-            offset -= blocks[i].getNumBytes();
-            if (offset <= 0) {
-                targetBlock = blocks[i];
-                break;
+            if (startpos >= 0) {
+                startpos -= blocks[i].getNumBytes();
+                if (startpos <= 0) {
+                    startBlock = i;
+                }
+            }
+            if (endpos >= 0) {
+                endpos -= blocks[i].getNumBytes();
+                if (endpos <= 0) {
+                    endBlock = i;
+                    break;
+                }
             }
         }
 
         //
-        // Next, create an array of hosts where that block can
+        // Next, create an array of hosts where each block can
         // be found
         //
-        UTF8 hosts[] = null;
-        if (targetBlock != null) {
-            TreeSet containingNodes = (TreeSet) blocksMap.get(targetBlock);
-            Vector v = new Vector();
-            for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
-                DatanodeInfo cur = (DatanodeInfo) it.next();
-                v.add(cur.getHost());
+        if (startBlock < 0 || endBlock < 0) {
+            return new UTF8[0][];
+        } else {
+            UTF8 hosts[][] = new UTF8[endBlock - startBlock + 1][];
+            for (int i = startBlock; i <= endBlock; i++) {
+                TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]);
+                Vector v = new Vector();
+                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) 
{
+                    DatanodeInfo cur = (DatanodeInfo) it.next();
+                    v.add(cur.getHost());
+                }
+                hosts[i] = (UTF8[]) v.toArray(new UTF8[v.size()]);
             }
-            hosts = (UTF8[]) v.toArray(new UTF8[v.size()]);
+            return hosts;
         }
-        return hosts;
     }
 
     /************************************************************
@@ -535,7 +597,6 @@
         TreeSet creates = new TreeSet();
 
         public Lease(UTF8 holder) {
-            LOG.info("New lease, holder " + holder);
             this.holder = holder;
             renew();
         }
@@ -570,7 +631,10 @@
                 internalReleaseLock(src, holder);
             }
             locks.clear();
-            internalReleaseCreates(creates);
+            for (Iterator it = creates.iterator(); it.hasNext(); ) {
+                UTF8 src = (UTF8) it.next();
+                internalReleaseCreate(src);
+            }
             creates.clear();
         }
 
@@ -674,14 +738,11 @@
     private int internalReleaseLock(UTF8 src, UTF8 holder) {
         return dir.releaseLock(src, holder);
     }
-    private void internalReleaseCreates(TreeSet creates) {
-        for (Iterator it = creates.iterator(); it.hasNext(); ) {
-            UTF8 src = (UTF8) it.next();
-            Vector v = (Vector) pendingCreates.remove(src);
-            for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
-                Block b = (Block) it2.next();
-                pendingCreateBlocks.remove(b);
-            }
+    private void internalReleaseCreate(UTF8 src) {
+        Vector v = (Vector) pendingCreates.remove(src);
+        for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
+            Block b = (Block) it2.next();
+            pendingCreateBlocks.remove(b);
         }
     }
 
@@ -695,7 +756,6 @@
                 sortedLeases.remove(lease);
                 lease.renew();
                 sortedLeases.add(lease);
-                LOG.info("Renewed lease " + lease);
             }
         }
     }
@@ -726,6 +786,7 @@
                 DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
 
                 if (nodeinfo == null) {
+                    LOG.info("Got brand-new heartbeat from " + name);
                     nodeinfo = new DatanodeInfo(name, capacity, remaining);
                     datanodeMap.put(name, nodeinfo);
                     capacityDiff = capacity;
@@ -798,7 +859,7 @@
      * The given node is reporting all its blocks.  Use this info to 
      * update the (machine-->blocklist) and (block-->machinelist) tables.
      */
-    public synchronized void processReport(Block newReport[], UTF8 name) {
+    public synchronized Block[] processReport(Block newReport[], UTF8 name) {
         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
         if (node == null) {
             throw new IllegalArgumentException("Unexpected exception.  
Received block report from node " + name + ", but there is no info for " + 
name);
@@ -842,6 +903,29 @@
         // Modify node so it has the new blockreport
         //
         node.updateBlocks(newReport);
+
+        //
+        // We've now completely updated the node's block report profile.
+        // We now go through all its blocks and find which ones are invalid,
+        // no longer pending, or over-replicated.
+        //
+        // (Note it's not enough to just invalidate blocks at lease expiry 
+        // time; datanodes can go down before the client's lease on 
+        // the failed file expires and miss the "expire" event.)
+        //
+        // This function considers every block on a datanode, and thus
+        // should only be invoked infrequently.
+        //
+        Vector obsolete = new Vector();
+        for (Iterator it = node.getBlockIterator(); it.hasNext(); ) {
+            Block b = (Block) it.next();
+
+            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+                LOG.info("Obsoleting block " + b);
+                obsolete.add(b);
+            }
+        }
+        return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
     }
 
     /**
@@ -870,7 +954,64 @@
                         neededReplications.add(block);
                     }
                 }
+
+                //
+                // Find how many of the containing nodes are "extra", if any.
+                // If there are any extras, call chooseExcessReplicates() to
+                // mark them in the excessReplicateMap.
+                //
+                Vector nonExcess = new Vector();
+                for (Iterator it = containingNodes.iterator(); it.hasNext(); ) 
{
+                    DatanodeInfo cur = (DatanodeInfo) it.next();
+                    TreeSet excessBlocks = (TreeSet) 
excessReplicateMap.get(cur.getName());
+                    if (excessBlocks == null || ! 
excessBlocks.contains(block)) {
+                        nonExcess.add(cur);
+                    }
+                }
+                if (nonExcess.size() > MAX_REPLICATION) {
+                    chooseExcessReplicates(nonExcess, block, MAX_REPLICATION); 
   
+                }
+            }
+        }
+    }
+
+    /**
+     * We want a max of "maxReps" replicates for any block, but we now have 
too many.  
+     * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such 
that:
+     *
+     * srcNodes.size() - dstNodes.size() == maxReps
+     *
+     * For now, we choose nodes randomly.  In the future, we might enforce some
+     * kind of policy (like making sure replicates are spread across racks).
+     */
+    void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
+        while (nonExcess.size() - maxReps > 0) {
+            int chosenNode = r.nextInt(nonExcess.size());
+            DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
+            nonExcess.removeElementAt(chosenNode);
+
+            TreeSet excessBlocks = (TreeSet) 
excessReplicateMap.get(cur.getName());
+            if (excessBlocks == null) {
+                excessBlocks = new TreeSet();
+                excessReplicateMap.put(cur.getName(), excessBlocks);
+            }
+            excessBlocks.add(b);
+
+            //
+            // The 'excessblocks' tracks blocks until we get confirmation
+            // that the datanode has deleted them; the only way we remove them
+            // is when we get a "removeBlock" message.  
+            //
+            // The 'invalidate' list is used to inform the datanode the block 
+            // should be deleted.  Items are removed from the invalidate list
+            // upon giving instructions to the namenode.
+            //
+            Vector invalidateSet = (Vector) 
recentInvalidateSets.get(cur.getName());
+            if (invalidateSet == null) {
+                invalidateSet = new Vector();
+                recentInvalidateSets.put(cur.getName(), invalidateSet);
             }
+            invalidateSet.add(b);
         }
     }
 
@@ -896,6 +1037,18 @@
                 neededReplications.add(block);
             }
         }
+
+        //
+        // We've removed a block from a node, so it's definitely no longer
+        // in "excess" there.
+        //
+        TreeSet excessBlocks = (TreeSet) 
excessReplicateMap.get(node.getName());
+        if (excessBlocks != null) {
+            excessBlocks.remove(block);
+            if (excessBlocks.size() == 0) {
+                excessReplicateMap.remove(node.getName());
+            }
+        }
     }
 
     /**
@@ -956,49 +1109,14 @@
     /////////////////////////////////////////////////////////
 
     /**
-     * Return with a list of Blocks that should be invalidated
-     * at the given node.  Done in response to a file delete, which 
-     * eliminates a number of blocks from the universe.
-     */
-    public synchronized Block[] recentlyInvalidBlocks(UTF8 name) {
-        Vector invalidateSet = (Vector) recentInvalidateSets.remove(name);
-        if (invalidateSet == null) {
-            return null;
-        } else {
-            return (Block[]) invalidateSet.toArray(new 
Block[invalidateSet.size()]);
-        }
-    }
-
-    /**
-     * If the node has not been checked in some time, go through
-     * its blocks and find which ones are neither valid nor pending.
-     * It often happens that a client will start writing blocks and
-     * then exit.  The blocks are on-disk, but the file will be 
-     * abandoned.
-     *
-     * It's not enough to invalidate blocks at lease expiry time;
-     * datanodes can go down before the client's lease on 
-     * the failed file expires and miss the "expire" event.
-     *
-     * This function considers every block on a datanode, and thus
-     * should only be invoked infrequently.
+     * Check if there are any recently-deleted blocks a datanode should remove.
      */
-    public synchronized Block[] checkObsoleteBlocks(UTF8 name) {
-        DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
-        if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= 
OBSOLETE_INTERVAL) {
-            return null;
+    public synchronized Block[] blocksToInvalidate(UTF8 sender) {
+        Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
+        if (invalidateSet != null) {
+            return (Block[]) invalidateSet.toArray(new 
Block[invalidateSet.size()]);
         } else {
-            nodeInfo.updateObsoleteCheck();
-            Vector obsolete = new Vector();
-            for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
-                Block b = (Block) it.next();
-
-                if (! dir.isValidBlock(b) && ! 
pendingCreateBlocks.contains(b)) {
-                    LOG.info("Obsoleting block " + b);
-                    obsolete.add(b);
-                }
-            }
-            return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
+            return null;
         }
     }
 
@@ -1012,17 +1130,10 @@
      *     target sequence for the Block at the appropriate index.
      *
      */
-    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int 
maxXfers) {
-        //
-        // Allow the namenode to come up and hear from all datanodes before
-        // making transfers.
-        //
-        if (System.currentTimeMillis() - systemStart < SYSTEM_STARTUP_PERIOD) {
-            return null;
-        }
-
+    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int 
xmitsInProgress) {
         synchronized (neededReplications) {
             Object results[] = null;
+           int scheduledXfers = 0;
 
             if (neededReplications.size() > 0) {
                 //
@@ -1036,7 +1147,7 @@
                     //
                     // We can only reply with 'maxXfers' or fewer blocks
                     //
-                    if (replicateBlocks.size() >= maxXfers) {
+                    if (scheduledXfers >= MAX_REPLICATION_STREAMS - 
xmitsInProgress) {
                         break;
                     }
 
@@ -1045,13 +1156,13 @@
                         it.remove();
                     } else {
                         TreeSet containingNodes = (TreeSet) 
blocksMap.get(block);
-
                         if (containingNodes.contains(srcNode)) {
-                            DatanodeInfo targets[] = 
chooseTargets(DESIRED_REPLICATION - containingNodes.size(), containingNodes);
+                            DatanodeInfo targets[] = 
chooseTargets(Math.min(DESIRED_REPLICATION - containingNodes.size(), 
MAX_REPLICATION_STREAMS - xmitsInProgress), containingNodes);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
                                 replicateTargetSets.add(targets);
+                               scheduledXfers += targets.length;
                             }
                         }
                     }
@@ -1059,9 +1170,9 @@
 
                 //
                 // Move the block-replication into a "pending" state.
-                // REMIND - mjc - the reason we use 'pending' is so we can 
retry
+                // The reason we use 'pending' is so we can retry
                 // replications that fail after an appropriate amount of time. 
 
-                // This is not yet implemented
+                // (REMIND - mjc - this timer is not yet implemented.)
                 //
                 if (replicateBlocks.size() > 0) {
                     int i = 0;
@@ -1074,13 +1185,14 @@
                             neededReplications.remove(block);
                             pendingReplications.add(block);
                         }
+
+                       LOG.info("Pending transfer (block " + 
block.getBlockName() + ") from " + srcNode.getName() + " to " + targets.length 
+ " destinations");
                     }
 
                     //
                     // Build returned objects from above lists
                     //
                     DatanodeInfo targetMatrix[][] = new 
DatanodeInfo[replicateTargetSets.size()][];
-                    LOG.info("Pending transfer from " + srcNode.getName() + " 
to " + targetMatrix.length + " destinations");
                     for (i = 0; i < targetMatrix.length; i++) {
                         targetMatrix[i] = (DatanodeInfo[]) 
replicateTargetSets.elementAt(i);
                     }
@@ -1094,10 +1206,13 @@
         }
     }
 
-
     /**
-     * Get a certain number of targets, if possible.  If not,
-     * return as many as we can.
+     * Get a certain number of targets, if possible.
+     * If not, return as many as we can.
+     * @param desiredReplicates number of duplicates wanted.
+     * @param forbiddenNodes of DatanodeInfo instances that should not be
+     * considered targets.
+     * @return array of DatanodeInfo instances uses as targets.
      */
     DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet 
forbiddenNodes) {
         TreeSet alreadyChosen = new TreeSet();
@@ -1109,7 +1224,7 @@
                 targets.add(target);
                 alreadyChosen.add(target);
             } else {
-                break;
+                break; // calling chooseTarget again won't help
             }
         }
         return (DatanodeInfo[]) targets.toArray(new 
DatanodeInfo[targets.size()]);
@@ -1122,90 +1237,99 @@
      * Right now it chooses randomly from available boxes.  In future could 
      * choose according to capacity and load-balancing needs (or even 
      * network-topology, to avoid inter-switch traffic).
+     * @param forbidden1 DatanodeInfo targets not allowed, null allowed.
+     * @param forbidden2 DatanodeInfo targets not allowed, null allowed.
+     * @return DatanodeInfo instance to use or null if something went wrong
+     * (a log message is emitted if null is returned).
      */
-    DatanodeInfo chooseTarget(TreeSet alreadyHasNode, TreeSet alreadyChosen) {
+    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2) {
+        //
+        // Check if there are any available targets at all
+        //
         int totalMachines = datanodeMap.size();
         if (totalMachines == 0) {
+            LOG.warning("While choosing target, totalMachines is " + 
totalMachines);
             return null;
         }
-        int freeMachines = totalMachines;
+
+        TreeSet forbiddenMachines = new TreeSet();
+        //
+        // In addition to already-chosen datanode/port pairs, we want to avoid
+        // already-chosen machinenames.  (There can be multiple datanodes per
+        // machine.)  We might relax this requirement in the future, though. 
(Maybe
+        // so that at least one replicate is off the machine.)
+        //
+        UTF8 hostOrHostAndPort = null;
+        if (forbidden1 != null) {
+          // add name [and host] of all elements in forbidden1 to 
forbiddenMachines
+            for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
+                DatanodeInfo cur = (DatanodeInfo) it.next();
+                if (allowSameHostTargets) {
+                  hostOrHostAndPort = cur.getName(); // forbid same host:port
+                } else {
+                  hostOrHostAndPort = cur.getHost(); // forbid same host
+                }
+                forbiddenMachines.add(hostOrHostAndPort);
+            }
+        }
+        if (forbidden2 != null) {
+          // add name [and host] of all elements in forbidden2 to 
forbiddenMachines
+            for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
+                DatanodeInfo cur = (DatanodeInfo) it.next();
+              if (allowSameHostTargets) {
+                hostOrHostAndPort = cur.getName(); // forbid same host:port
+              } else {
+                hostOrHostAndPort = cur.getHost(); // forbid same host
+              }
+              forbiddenMachines.add(hostOrHostAndPort);
+            }
+        }
+
+        //
+        // Now build list of machines we can actually choose from
+        //
+        long totalRemaining = 0;
+        Vector targetList = new Vector();
         for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeInfo node = (DatanodeInfo) it.next();
-            if ((alreadyHasNode != null && alreadyHasNode.contains(node)) ||
-                (alreadyChosen != null && alreadyChosen.contains(node))) {
-                freeMachines--;
+            if (allowSameHostTargets) {
+                hostOrHostAndPort = node.getName(); // match host:port
+            } else {
+                hostOrHostAndPort = node.getHost(); // match host
+            }
+            if (! forbiddenMachines.contains(hostOrHostAndPort)) {
+                targetList.add(node);
+                totalRemaining += node.getRemaining();
             }
         }
 
         //
         // Now pick one
         //
-        DatanodeInfo target = null;
-        if (freeMachines > 0) {
-            //
-            // Get all possible targets
-            //
-            int i = 0;
-            DatanodeInfo targetlist[] = new DatanodeInfo[totalMachines];
-            for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); 
i++) {
-                targetlist[i] = (DatanodeInfo) it.next();
-            }
-        
-            do {
-                int index = r.nextInt(totalMachines);
-                target = targetlist[index];
-
-                if ((alreadyHasNode != null && 
alreadyHasNode.contains(target)) ||
-                    (alreadyChosen != null && alreadyChosen.contains(target))) 
{
-                    target = null;
-                }
-            } while (target == null);
-        }
-        return target;
-
-        /**
-         * Choose target weighted by available storage
-         */
-        /**
-        synchronized (datanodeMap) {
-            if (datanodeMap.size() == 0) {
-                return;
-            }
+        if (targetList.size() == 0) {
+            LOG.warning("Zero targets found, forbidden1.size=" +
+                ( forbidden1 != null ? forbidden1.size() : 0 ) +
+                " allowSameHostTargets=" + allowSameHostTargets +
+                " forbidden2.size()=" +
+                ( forbidden2 != null ? forbidden2.size() : 0 ));
+            return null;
+        } else if (! USE_AVAILABILITY) {
+            int target = r.nextInt(targetList.size());
+            return (DatanodeInfo) targetList.elementAt(target);
+        } else {
+            // Choose node according to target capacity
+            double target = r.nextDouble() * totalRemaining;
 
-            long totalRemaining = 0;
-            Vector okTargets = new Vector();
-            for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); 
) {
+            for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                if ((alreadyHasNode == null || ! 
alreadyHasNode.contains(node)) &&
-                    (alreadyChosen == null || ! alreadyChosen.contains(node))) 
{
-                    okTargets.add(node);
-                    totalRemaining += node.getRemaining();
+                target -= node.getRemaining();
+                if (target <= 0) {
+                    return node;
                 }
             }
 
-            //
-            // Now pick one
-            //
-            DatanodeInfo target = null;
-            if (okTargets.size() > 0) {
-                //
-                // Repeatedly choose random byte of the total bytes free.
-                // The machine that has that byte will be our target.  Thus,
-                // we select at random with bias toward machines with greater
-                // available storage.
-                //
-                long targetByte = r.nextLong(totalRemaining);
-                for (Iterator it = okTargets.iterator(); it.hasNext(); ) {
-                    DatanodeInfo node = (DatanodeInfO) it.next();
-                    targetByte -= node.getRemaining();
-                    if (targetByte <= 0) {
-                        target = node;
-                        break;
-                    }
-                }
-            }
-            return target;
+            LOG.warning("Impossible state.  When trying to choose target node, 
could not find any.  This may indicate that datanode capacities are being 
updated during datanode selection.  Anyway, now returning an arbitrary target 
to recover...");
+            return (DatanodeInfo) 
targetList.elementAt(r.nextInt(targetList.size()));
         }
-        **/
     }
 }


Reply via email to