Author: mc
Date: Mon Aug  1 11:15:30 2005
New Revision: 226851

URL: http://svn.apache.org/viewcvs?rev=226851&view=rev
Log:

  The client will re-ask the NameNode for block info if
it is unable to find a certain block at any datanode.

  Also, add some more LOG messages so we can see what's happening.


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226851&r1=226850&r2=226851&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java 
Mon Aug  1 11:15:30 2005
@@ -34,6 +34,7 @@
 public class NDFSClient implements FSConstants {
     public static final Logger LOG = 
LogFormatter.getLogger("org.apache.nutch.fs.NDFSClient");
     static int BUFFER_SIZE = 4096;
+    static int MAX_BLOCK_ACQUIRE_FAILURES = 10;
     ClientProtocol namenode;
     boolean running = true;
     Random r = new Random();
@@ -68,15 +69,7 @@
      */
     public NFSInputStream open(UTF8 src) throws IOException {
         // Get block info from namenode
-        LocatedBlock results[] = namenode.open(src.toString());
-
-        Vector blocks = new Vector();
-        Vector locs = new Vector();
-        for (int i = 0; i < results.length; i++) {
-            blocks.add(results[i].getBlock());
-            locs.add(results[i].getLocations());
-        }
-        return new NDFSInputStream((Block[]) blocks.toArray(new 
Block[blocks.size()]), (DatanodeInfo[][]) locs.toArray(new 
DatanodeInfo[locs.size()][]));
+        return new NDFSInputStream(src.toString());
     }
 
     /**
@@ -233,19 +226,20 @@
     class NDFSInputStream extends NFSInputStream {
         boolean closed = false;
 
+        private String src;
         private DataInputStream blockStream;
         private DataOutputStream partnerStream;
-        private Block blocks[];
-        private DatanodeInfo nodes[][];
+        private Block blocks[] = null;
+        private DatanodeInfo nodes[][] = null;
         private long pos = 0;
         private long filelen = 0;
         private long blockEnd = -1;
 
         /**
          */
-        public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws 
IOException {
-            this.blocks = blocks;
-            this.nodes = nodes;
+        public NDFSInputStream(String src) throws IOException {
+            this.src = src;
+            openInfo();
             this.blockStream = null;
             this.partnerStream = null;
             for (int i = 0; i < blocks.length; i++) {
@@ -254,6 +248,35 @@
         }
 
         /**
+         * Grab the open-file info from namenode
+         */
+        void openInfo() throws IOException {
+            Block oldBlocks[] = this.blocks;
+
+            LocatedBlock results[] = namenode.open(src);            
+            Vector blockV = new Vector();
+            Vector nodeV = new Vector();
+            for (int i = 0; i < results.length; i++) {
+                blockV.add(results[i].getBlock());
+                nodeV.add(results[i].getLocations());
+            }
+            Block newBlocks[] = (Block[]) blockV.toArray(new 
Block[blockV.size()]);
+
+            if (oldBlocks != null) {
+                for (int i = 0; i < oldBlocks.length; i++) {
+                    if (! oldBlocks[i].equals(newBlocks[i])) {
+                        throw new IOException("Blocklist for " + src + " has 
changed!");
+                    }
+                }
+                if (oldBlocks.length != newBlocks.length) {
+                    throw new IOException("Blocklist for " + src + " now has 
different length");
+                }
+            }
+            this.blocks = newBlocks;
+            this.nodes = (DatanodeInfo[][]) nodeV.toArray(new 
DatanodeInfo[nodeV.size()][]);
+        }
+
+        /**
          * Open a DataInputStream to a DataNode so that it can be read from.
          * We get block ID and the IDs of the destinations at startup, from 
the namenode.
          */
@@ -292,6 +315,7 @@
             //
             // Connect to best DataNode for desired Block, with potential 
offset
             //
+            int failures = 0;
             InetSocketAddress targetAddr = null;
             Socket s = null;
             TreeSet deadNodes = new TreeSet();
@@ -302,12 +326,22 @@
                     chosenNode = bestNode(nodes[targetBlock], deadNodes);
                     targetAddr = 
DataNode.createSocketAddr(chosenNode.getName().toString());
                 } catch (IOException ie) {
-                    LOG.info("Could not obtain block from any node.  
Retrying...");
+                    /**
+                    if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+                        throw new IOException("Could not obtain block " + 
blocks[targetBlock]);
+                    }
+                    **/
+                    if (nodes[targetBlock] == null || 
nodes[targetBlock].length == 0) {
+                        LOG.info("No node available for block " + 
blocks[targetBlock]);
+                    }
+                    LOG.info("Could not obtain block from any node:  " + ie);
                     try {
                         Thread.sleep(10000);
                     } catch (InterruptedException iex) {
                     }
                     deadNodes.clear();
+                    openInfo();
+                    failures++;
                     continue;
                 }
                 try {


Reply via email to