Author: mc
Date: Mon Aug 1 11:15:30 2005
New Revision: 226851
URL: http://svn.apache.org/viewcvs?rev=226851&view=rev
Log:
The client will re-ask the NameNode for block info if
it is unable to find a certain block at any datanode.
Also, add some more LOG messages so we can see what's happening.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226851&r1=226850&r2=226851&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
Mon Aug 1 11:15:30 2005
@@ -34,6 +34,7 @@
public class NDFSClient implements FSConstants {
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.fs.NDFSClient");
static int BUFFER_SIZE = 4096;
+ static int MAX_BLOCK_ACQUIRE_FAILURES = 10;
ClientProtocol namenode;
boolean running = true;
Random r = new Random();
@@ -68,15 +69,7 @@
*/
public NFSInputStream open(UTF8 src) throws IOException {
// Get block info from namenode
- LocatedBlock results[] = namenode.open(src.toString());
-
- Vector blocks = new Vector();
- Vector locs = new Vector();
- for (int i = 0; i < results.length; i++) {
- blocks.add(results[i].getBlock());
- locs.add(results[i].getLocations());
- }
- return new NDFSInputStream((Block[]) blocks.toArray(new
Block[blocks.size()]), (DatanodeInfo[][]) locs.toArray(new
DatanodeInfo[locs.size()][]));
+ return new NDFSInputStream(src.toString());
}
/**
@@ -233,19 +226,20 @@
class NDFSInputStream extends NFSInputStream {
boolean closed = false;
+ private String src;
private DataInputStream blockStream;
private DataOutputStream partnerStream;
- private Block blocks[];
- private DatanodeInfo nodes[][];
+ private Block blocks[] = null;
+ private DatanodeInfo nodes[][] = null;
private long pos = 0;
private long filelen = 0;
private long blockEnd = -1;
/**
*/
- public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws
IOException {
- this.blocks = blocks;
- this.nodes = nodes;
+ public NDFSInputStream(String src) throws IOException {
+ this.src = src;
+ openInfo();
this.blockStream = null;
this.partnerStream = null;
for (int i = 0; i < blocks.length; i++) {
@@ -254,6 +248,35 @@
}
/**
+ * Grab the open-file info from namenode
+ */
+ void openInfo() throws IOException {
+ Block oldBlocks[] = this.blocks;
+
+ LocatedBlock results[] = namenode.open(src);
+ Vector blockV = new Vector();
+ Vector nodeV = new Vector();
+ for (int i = 0; i < results.length; i++) {
+ blockV.add(results[i].getBlock());
+ nodeV.add(results[i].getLocations());
+ }
+ Block newBlocks[] = (Block[]) blockV.toArray(new
Block[blockV.size()]);
+
+ if (oldBlocks != null) {
+ for (int i = 0; i < oldBlocks.length; i++) {
+ if (! oldBlocks[i].equals(newBlocks[i])) {
+ throw new IOException("Blocklist for " + src + " has
changed!");
+ }
+ }
+ if (oldBlocks.length != newBlocks.length) {
+ throw new IOException("Blocklist for " + src + " now has
different length");
+ }
+ }
+ this.blocks = newBlocks;
+ this.nodes = (DatanodeInfo[][]) nodeV.toArray(new
DatanodeInfo[nodeV.size()][]);
+ }
+
+ /**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from
the namenode.
*/
@@ -292,6 +315,7 @@
//
// Connect to best DataNode for desired Block, with potential
offset
//
+ int failures = 0;
InetSocketAddress targetAddr = null;
Socket s = null;
TreeSet deadNodes = new TreeSet();
@@ -302,12 +326,22 @@
chosenNode = bestNode(nodes[targetBlock], deadNodes);
targetAddr =
DataNode.createSocketAddr(chosenNode.getName().toString());
} catch (IOException ie) {
- LOG.info("Could not obtain block from any node.
Retrying...");
+ /**
+ if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+ throw new IOException("Could not obtain block " +
blocks[targetBlock]);
+ }
+ **/
+ if (nodes[targetBlock] == null ||
nodes[targetBlock].length == 0) {
+ LOG.info("No node available for block " +
blocks[targetBlock]);
+ }
+ LOG.info("Could not obtain block from any node: " + ie);
try {
Thread.sleep(10000);
} catch (InterruptedException iex) {
}
deadNodes.clear();
+ openInfo();
+ failures++;
continue;
}
try {