Author: cutting
Date: Thu Dec 1 12:28:49 2005
New Revision: 351462
URL: http://svn.apache.org/viewcvs?rev=351462&view=rev
Log:
Add TestNDFS, NUTCH-116. Contributed by Paul Baclace.
Added:
lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/
lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/TestNDFS.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java?rev=351462&r1=351461&r2=351462&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Server.java Thu
Dec 1 12:28:49 2005
@@ -95,7 +95,9 @@
}
try {
socket.close();
- } catch (IOException e) {}
+ } catch (IOException e) {
+ LOG.info(getName() + ": e=" + e);
+ }
LOG.info(getName() + ": exiting");
}
}
@@ -166,9 +168,9 @@
/** Handles queued calls . */
private class Handler extends Thread {
- public Handler() {
+ public Handler(int instanceNumber) {
this.setDaemon(true);
- this.setName("Server handler on " + port);
+ this.setName("Server handler "+ instanceNumber + " on " + port);
}
public void run() {
@@ -242,25 +244,31 @@
listener.start();
for (int i = 0; i < handlerCount; i++) {
- Handler handler = new Handler();
+ Handler handler = new Handler(i);
handler.start();
}
}
- /** Stops the service. No calls will be handled after this is called. All
- * threads will exit. */
+ /** Stops the service. No new calls will be handled after this is called.
All
+ * subthreads will likely be finished after this returns.
+ */
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
try {
- Thread.sleep(timeout); // let all threads exit
+ Thread.sleep(timeout); // inexactly wait for pending requests to
finish
} catch (InterruptedException e) {}
- notify();
+ notifyAll();
}
- /** Wait for the server to be stopped. */
+ /** Wait for the server to be stopped.
+ * Does not wait for all subthreads to finish.
+ * See [EMAIL PROTECTED] #stop()}.
+ */
public synchronized void join() throws InterruptedException {
- wait();
+ while (running) {
+ wait();
+ }
}
/** Called for each call. */
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=351462&r1=351461&r2=351462&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java
Thu Dec 1 12:28:49 2005
@@ -37,7 +37,7 @@
**********************************************************/
public class DataNode implements FSConstants, Runnable {
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.ndfs.DataNode");
- //
+ //
// REMIND - mjc - I might bring "maxgigs" back so user can place
// artificial limit on space
//private static final long GIGABYTE = 1024 * 1024 * 1024;
@@ -59,6 +59,8 @@
return new InetSocketAddress(host, port);
}
+
+ private static Vector subThreadList = null;
DatanodeProtocol namenode;
FSDataset data;
String localName;
@@ -66,6 +68,8 @@
Vector receivedBlockList = new Vector();
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
+ long blockReportInterval;
+ private long datanodeStartupPeriod;
private NutchConf fConf;
/**
@@ -98,6 +102,13 @@
this.localName = machineName + ":" + tmpPort;
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
this.dataXceiveServer.start();
+
+ long blockReportIntervalBasis =
+ conf.getLong("ndfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
+ this.blockReportInterval =
+ blockReportIntervalBasis - new
Random().nextInt((int)(blockReportIntervalBasis/10));
+ this.datanodeStartupPeriod =
+ conf.getLong("ndfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
}
/**
@@ -109,6 +120,7 @@
/**
* Shut down this instance of the datanode.
+ * Returns only after shutdown is complete.
*/
void shutdown() {
this.shouldRun = false;
@@ -127,8 +139,7 @@
long lastHeartbeat = 0, lastBlockReport = 0;
long sendStart = System.currentTimeMillis();
int heartbeatsSent = 0;
- long blockReportInterval =
- BLOCKREPORT_INTERVAL - new
Random().nextInt((int)(BLOCKREPORT_INTERVAL/10));
+ LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval +
"msec");
//
// Now loop for a long time....
@@ -181,7 +192,7 @@
// to pass from the time of connection to the first
block-transfer.
// Otherwise we transfer a lot of blocks unnecessarily.
//
- if (now - sendStart > DATANODE_STARTUP_PERIOD) {
+ if (now - sendStart > datanodeStartupPeriod) {
//
// Check to see if there are any block-instructions from the
// namenode that this datanode should perform.
@@ -651,35 +662,92 @@
offerService();
} catch (Exception ex) {
LOG.info("Exception: " + ex);
+ if (shouldRun) {
LOG.info("Lost connection to namenode. Retrying...");
try {
- Thread.sleep(5000);
+ Thread.sleep(5000);
} catch (InterruptedException ie) {
}
+ }
}
}
+ LOG.info("Finishing DataNode in: "+data.data);
}
- /**
+ /** Start datanode daemons.
+ * Start a datanode daemon for each comma separated data directory
+ * specified in property ndfs.data.dir
*/
public static void run(NutchConf conf) throws IOException {
String[] dataDirs = conf.getStrings("ndfs.data.dir");
+ subThreadList = new Vector(dataDirs.length);
for (int i = 0; i < dataDirs.length; i++) {
- String dataDir = dataDirs[i];
- File data = new File(dataDir);
- data.mkdirs();
- if (!data.isDirectory()) {
- LOG.warning("Can't start DataNode in non-directory: "+dataDir);
- continue;
- }
- new Thread(new DataNode(conf, dataDir), "DataNode:
"+dataDir).start();
+ DataNode dn = makeInstanceForDir(dataDirs[i], conf);
+ if (dn != null) {
+ Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);
+ t.setDaemon(true); // needed for JUnit testing
+ t.start();
+ subThreadList.add(t);
+ }
}
}
- /**
+ /** Start datanode daemons.
+ * Start a datanode daemon for each comma separated data directory
+ * specified in property ndfs.data.dir and wait for them to finish.
+ * If this thread is specifically interrupted, it will stop waiting.
+ */
+ private static void runAndWait(NutchConf conf) throws IOException {
+ run(conf);
+
+ // Wait for sub threads to exit
+ for (Iterator iterator = subThreadList.iterator(); iterator.hasNext();) {
+ Thread threadDataNode = (Thread) iterator.next();
+ try {
+ threadDataNode.join();
+ } catch (InterruptedException e) {
+ if (Thread.currentThread().isInterrupted()) {
+ // did someone knock?
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Make an instance of DataNode after ensuring that given data directory
+ * (and parent directories, if necessary) can be created.
+ * @param dataDir where the new DataNode instance should keep its files.
+ * @param conf NutchConf instance to use.
+ * @return DataNode instance for given data dir and conf, or null if
directory
+ * cannot be created.
+ * @throws IOException
+ */
+ static DataNode makeInstanceForDir(String dataDir, NutchConf conf) throws
IOException {
+ DataNode dn = null;
+ File data = new File(dataDir);
+ data.mkdirs();
+ if (!data.isDirectory()) {
+ LOG.warning("Can't start DataNode in non-directory: "+dataDir);
+ return null;
+ } else {
+ dn = new DataNode(conf, dataDir);
+ }
+ return dn;
+ }
+
+ public String toString() {
+ return "DataNode{" +
+ "data=" + data +
+ ", localName='" + localName + "'" +
+ ", xmitsInProgress=" + xmitsInProgress +
+ "}";
+ }
+
+ /**
*/
public static void main(String args[]) throws IOException {
LogFormatter.setShowThreadIDs(true);
- run(NutchConf.get());
+ runAndWait(NutchConf.get());
}
}
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=351462&r1=351461&r2=351462&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
Thu Dec 1 12:28:49 2005
@@ -29,7 +29,8 @@
***************************************************/
public class FSDataset implements FSConstants {
static final double USABLE_DISK_PCT = 0.98;
- /**
+
+ /**
* A node type that can be built into a tree reflecting the
* hierarchy of blocks on the local disk.
*/
@@ -166,6 +167,13 @@
blkid = blkid >> ((15 - halfByteIndex) * 4);
return (int) ((0x000000000000000F) & blkid);
}
+
+ public String toString() {
+ return "FSDir{" +
+ "dir=" + dir +
+ ", children=" + (children == null ? null :
Arrays.asList(children)) +
+ "}";
+ }
}
//////////////////////////////////////////////////////
@@ -411,4 +419,11 @@
// REMIND - mjc - should cache this result for performance
return new File(tmp, b.getBlockName());
}
+
+ public String toString() {
+ return "FSDataset{" +
+ "dirpath='" + dirpath + "'" +
+ "}";
+ }
+
}
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=351462&r1=351461&r2=351462&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
Thu Dec 1 12:28:49 2005
@@ -53,6 +53,9 @@
// Whether we should use disk-availability info when determining target
final static boolean USE_AVAILABILITY =
NutchConf.get().getBoolean("ndfs.availability.allocation", false);
+ private boolean allowSameHostTargets =
+ NutchConf.get().getBoolean("test.ndfs.same.host.targets.allowed",
false);
+
//
// Stores the correct file name hierarchy
//
@@ -127,14 +130,14 @@
// Store set of Blocks that need to be replicated 1 or more times.
// We also store pending replication-orders.
//
- TreeSet neededReplications = new TreeSet();
- TreeSet pendingReplications = new TreeSet();
+ private TreeSet neededReplications = new TreeSet();
+ private TreeSet pendingReplications = new TreeSet();
//
// Used for handling lock-leases
//
- TreeMap leases = new TreeMap();
- TreeSet sortedLeases = new TreeSet();
+ private TreeMap leases = new TreeMap();
+ private TreeSet sortedLeases = new TreeSet();
//
// Threaded object that checks to see if we have been
@@ -159,17 +162,23 @@
this.systemStart = System.currentTimeMillis();
}
- /**
+ /** Close down this filesystem manager.
+ * Causes heartbeat and lease daemons to stop; waits briefly for
+ * them to finish, but a short timeout returns control back to caller.
*/
public void close() {
+ synchronized (this) {
fsRunning = false;
+ }
try {
- hbthread.join();
- } catch (InterruptedException ie) {
- }
- try {
- lmthread.join();
+ hbthread.join(3000);
} catch (InterruptedException ie) {
+ } finally {
+ // using finally to ensure we also wait for lease daemon
+ try {
+ lmthread.join(3000);
+ } catch (InterruptedException ie) {
+ }
}
}
@@ -218,6 +227,9 @@
* of machines. The first on this list should be where the client
* writes data. Subsequent items in the list must be provided in
* the connection to the first datanode.
+ * @return Return an array that consists of the block, plus a set
+ * of machines, or null if src is invalid for creation (based on
+ * [EMAIL PROTECTED] FSDirectory#isValidToCreate(UTF8)}.
*/
public synchronized Object[] startFile(UTF8 src, UTF8 holder, boolean
overwrite) {
Object results[] = null;
@@ -234,7 +246,8 @@
// Get the array of replication targets
DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION,
null);
if (targets.length < MIN_REPLICATION) {
- LOG.info("Target-length is " + targets.length + ", below
MIN_REPLICATION (" + MIN_REPLICATION + ")");
+ LOG.warning("Target-length is " + targets.length +
+ ", below MIN_REPLICATION (" + MIN_REPLICATION + ")");
return null;
}
@@ -257,9 +270,11 @@
// Create next block
results[0] = allocateBlock(src);
results[1] = targets;
+ } else { // ! fileValid
+ LOG.warning("Cannot start file because it is invalid. src=" +
src);
}
} else {
- LOG.info("Cannot start file because pendingCreates is non-null");
+ LOG.warning("Cannot start file because pendingCreates is non-null.
src=" + src);
}
return results;
}
@@ -1172,10 +1187,13 @@
}
}
-
/**
- * Get a certain number of targets, if possible. If not,
- * return as many as we can.
+ * Get a certain number of targets, if possible.
+ * If not, return as many as we can.
+ * @param desiredReplicates number of duplicates wanted.
+ * @param forbiddenNodes of DatanodeInfo instances that should not be
+ * considered targets.
+ * @return array of DatanodeInfo instances uses as targets.
*/
DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet
forbiddenNodes) {
TreeSet alreadyChosen = new TreeSet();
@@ -1187,7 +1205,7 @@
targets.add(target);
alreadyChosen.add(target);
} else {
- break;
+ break; // calling chooseTarget again won't help
}
}
return (DatanodeInfo[]) targets.toArray(new
DatanodeInfo[targets.size()]);
@@ -1200,6 +1218,10 @@
* Right now it chooses randomly from available boxes. In future could
* choose according to capacity and load-balancing needs (or even
* network-topology, to avoid inter-switch traffic).
+ * @param forbidden1 DatanodeInfo targets not allowed, null allowed.
+ * @param forbidden2 DatanodeInfo targets not allowed, null allowed.
+ * @return DatanodeInfo instance to use or null if something went wrong
+ * (a log message is emitted if null is returned).
*/
DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2) {
//
@@ -1207,27 +1229,40 @@
//
int totalMachines = datanodeMap.size();
if (totalMachines == 0) {
- LOG.info("While choosing target, totalMachines is " +
totalMachines);
+ LOG.warning("While choosing target, totalMachines is " +
totalMachines);
return null;
}
+ TreeSet forbiddenMachines = new TreeSet();
//
// In addition to already-chosen datanode/port pairs, we want to avoid
// already-chosen machinenames. (There can be multiple datanodes per
// machine.) We might relax this requirement in the future, though.
(Maybe
// so that at least one replicate is off the machine.)
//
- TreeSet forbiddenMachines = new TreeSet();
+ UTF8 hostOrHostAndPort = null;
if (forbidden1 != null) {
+ // add name [and host] of all elements in forbidden1 to
forbiddenMachines
for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();
- forbiddenMachines.add(cur.getName());
+ if (allowSameHostTargets) {
+ hostOrHostAndPort = cur.getName(); // forbid same host:port
+ } else {
+ hostOrHostAndPort = cur.getHost(); // forbid same host
+ }
+ forbiddenMachines.add(hostOrHostAndPort);
}
}
if (forbidden2 != null) {
+ // add name [and host] of all elements in forbidden2 to
forbiddenMachines
for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();
- forbiddenMachines.add(cur.getName());
+ if (allowSameHostTargets) {
+ hostOrHostAndPort = cur.getName(); // forbid same host:port
+ } else {
+ hostOrHostAndPort = cur.getHost(); // forbid same host
+ }
+ forbiddenMachines.add(hostOrHostAndPort);
}
}
@@ -1238,9 +1273,12 @@
Vector targetList = new Vector();
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
- if ((forbidden1 == null || ! forbidden1.contains(node)) &&
- (forbidden2 == null || ! forbidden2.contains(node)) &&
- (! forbiddenMachines.contains(node.getName()))) {
+ if (allowSameHostTargets) {
+ hostOrHostAndPort = node.getName(); // match host:port
+ } else {
+ hostOrHostAndPort = node.getHost(); // match host
+ }
+ if (! forbiddenMachines.contains(hostOrHostAndPort)) {
targetList.add(node);
totalRemaining += node.getRemaining();
}
@@ -1250,6 +1288,11 @@
// Now pick one
//
if (targetList.size() == 0) {
+ LOG.warning("Zero targets found, forbidden1.size=" +
+ ( forbidden1 != null ? forbidden1.size() : 0 ) +
+ " allowSameHostTargets=" + allowSameHostTargets +
+ " forbidden2.size()=" +
+ ( forbidden2 != null ? forbidden2.size() : 0 ));
return null;
} else if (! USE_AVAILABILITY) {
int target = r.nextInt(targetList.size());
@@ -1266,7 +1309,7 @@
}
}
- LOG.info("Impossible state. When trying to choose target node,
could not find any. This may indicate that datanode capacities are being
updated during datanode selection. Anyway, now returning an arbitrary target
to recover...");
+ LOG.warning("Impossible state. When trying to choose target node,
could not find any. This may indicate that datanode capacities are being
updated during datanode selection. Anyway, now returning an arbitrary target
to recover...");
return (DatanodeInfo)
targetList.elementAt(r.nextInt(targetList.size()));
}
}
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=351462&r1=351461&r2=351462&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
Thu Dec 1 12:28:49 2005
@@ -38,8 +38,12 @@
public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants
{
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.ndfs.NameNode");
- FSNamesystem namesystem;
- Server server;
+ private FSNamesystem namesystem;
+ private Server server;
+ private int handlerCount = 2;
+
+ /** only used for testing purposes */
+ private boolean stopRequested = false;
/**
* Create a NameNode at the default location
@@ -52,24 +56,40 @@
}
/**
- * Create a NameNode at the specified location
+ * Create a NameNode at the specified location and start it.
*/
public NameNode(File dir, int port) throws IOException {
this.namesystem = new FSNamesystem(dir);
- this.server = RPC.getServer(this, port, 10, false);
+ this.handlerCount =
+ NutchConf.get().getInt("ndfs.namenode.handler.count", 10);
+ this.server = RPC.getServer(this, port, handlerCount, false);
this.server.start();
}
/**
- * Run forever
+ * Wait for service to finish.
+ * (Normally, it runs forever.)
*/
- public void offerService() {
+ public void join() {
try {
this.server.join();
} catch (InterruptedException ie) {
}
}
+ /**
+ * Stop all NameNode threads and wait for all to finish.
+ * Package-only access since this is intended for JUnit testing.
+ */
+ void stop() {
+ if (! stopRequested) {
+ stopRequested = true;
+ namesystem.close();
+ server.stop();
+ //this.join();
+ }
+ }
+
/////////////////////////////////////////////////////
// ClientProtocol
/////////////////////////////////////////////////////
@@ -78,7 +98,7 @@
public LocatedBlock[] open(String src) throws IOException {
Object openResults[] = namesystem.open(new UTF8(src));
if (openResults == null) {
- throw new IOException("Cannot find filename " + src);
+ throw new IOException("Cannot open filename " + src);
} else {
Block blocks[] = (Block[]) openResults[0];
DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];
@@ -95,7 +115,7 @@
public LocatedBlock create(String src, String clientName, boolean
overwrite) throws IOException {
Object results[] = namesystem.startFile(new UTF8(src), new
UTF8(clientName), overwrite);
if (results == null) {
- throw new IOException("Cannot create file " + src);
+ throw new IOException("Cannot create file " + src + " on client "
+ clientName);
} else {
Block b = (Block) results[0];
DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
@@ -324,6 +344,6 @@
*/
public static void main(String argv[]) throws IOException,
InterruptedException {
NameNode namenode = new NameNode();
- namenode.offerService();
+ namenode.join();
}
}
Added: lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/TestNDFS.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/TestNDFS.java?rev=351462&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/TestNDFS.java
(added)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/ndfs/TestNDFS.java
Thu Dec 1 12:28:49 2005
@@ -0,0 +1,530 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.ndfs;
+
+import junit.framework.TestCase;
+import junit.framework.AssertionFailedError;
+import org.apache.nutch.fs.NFSInputStream;
+import org.apache.nutch.fs.NFSOutputStream;
+import org.apache.nutch.fs.FileUtil;
+import org.apache.nutch.io.UTF8;
+import org.apache.nutch.util.LogFormatter;
+import org.apache.nutch.util.NutchConf;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.ListIterator;
+import java.util.logging.Logger;
+import java.util.Random;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Test NDFS.
+ * TestNDFS is a JUnit test for NDFS using "pseudo multiprocessing" (or
+ more strictly, pseudo distributed) meaning all daemons run in one process
+ and sockets are used to communicate between daemons. The test permutes
+ * various block sizes, number of files, file sizes, and number of
+ * datanodes. After creating 1 or more files and filling them with random
+ * data, one datanode is shutdown, and then the files are verfified.
+ * Next, all the random test files are deleted and we test for leakage
+ * (non-deletion) by directly checking the real directories corresponding
+ * to the datanodes still running.
+ * <p>
+ * Usage notes: TEST_PERMUTATION_MAX can be adjusted to perform more or
+ * less testing of permutations. The ceiling of useful permutation is
+ * TEST_PERMUTATION_MAX_CEILING.
+ * <p>
+ * NDFSClient emits many messages that can be ignored like:
+ * "Failed to connect to *:7000:java.net.ConnectException: Connection refused:
connect"
+ * because a datanode is forced to close during testing.
+ * <p>
+ * Warnings about "Zero targets found" can be ignored (these are naggingly
+ * emitted even though it is not possible to achieve the desired replication
+ * level with the number of active datanodes.)
+ * <p>
+ * Possible Extensions:
+ * <p>Bring a datanode down and restart it to verify reconnection to namenode.
+ * <p>Simulate running out of disk space on one datanode only.
+ * <p>Bring the namenode down and restart it to verify that datanodes
reconnect.
+ * <p>
+ * <p>For a another approach to filesystem testing, see the high level
+ * (NutchFS level) test [EMAIL PROTECTED]
org.apache.nutch.fs.TestNutchFileSystem}.
+ * @author Paul Baclace
+ */
+public class TestNDFS extends TestCase implements FSConstants {
+ private static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.ndfs.TestNDFS");
+
+ private static int BUFFER_SIZE =
+ NutchConf.get().getInt("io.file.buffer.size", 4096);
+
+ private static int testCycleNumber = 0;
+
+ /**
+ * all NDFS test files go under this base directory
+ */
+ private static String baseDirSpecified;
+
+ /**
+ * base dir as File
+ */
+ private static File baseDir;
+
+ /** NDFS block sizes to permute over in multiple test cycles
+ * (array length should be prime).
+ */
+ private static final int[] BLOCK_SIZES = {100000, 4096};
+
+ /** NDFS file sizes to permute over in multiple test cycles
+ * (array length should be prime).
+ */
+ private static final int[] FILE_SIZES =
+ {100000, 100001, 4095, 4096, 4097, 1000000, 1000001};
+
+ /** NDFS file counts to permute over in multiple test cycles
+ * (array length should be prime).
+ */
+ private static final int[] FILE_COUNTS = {1, 10, 100};
+
+ /** Number of useful permutations or test cycles.
+ * (The 2 factor represents the alternating 2 or 3 number of datanodes
+ * started.)
+ */
+ private static final int TEST_PERMUTATION_MAX_CEILING =
+ BLOCK_SIZES.length * FILE_SIZES.length * FILE_COUNTS.length * 2;
+
+ /** Number of permutations of NDFS test parameters to perform.
+ * If this is greater than ceiling TEST_PERMUTATION_MAX_CEILING, then the
+ * ceiling value is used.
+ */
+ private static final int TEST_PERMUTATION_MAX = 3;
+ private Constructor randomDataGeneratorCtor = null;
+
+ static {
+ baseDirSpecified = System.getProperty("test.ndfs.data", "/tmp/ndfs_test");
+ baseDir = new File(baseDirSpecified);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ NutchConf.get().setBoolean("test.ndfs.same.host.targets.allowed", true);
+ }
+
+ /**
+ * Remove old files from temp area used by this test case and be sure
+ * base temp directory can be created.
+ */
+ protected void prepareTempFileSpace() {
+ if (baseDir.exists()) {
+ try { // start from a blank slate
+ FileUtil.fullyDelete(baseDir);
+ } catch (Exception ignored) {
+ }
+ }
+ baseDir.mkdirs();
+ if (!baseDir.isDirectory()) {
+ throw new RuntimeException("Value of root directory property
test.ndfs.data for ndfs test is not a directory: "
+ + baseDirSpecified);
+ }
+ }
+
+ /**
+ * Pseudo Distributed FS Test.
+ * Test NDFS by running all the necessary daemons in one process.
+ * Test various block sizes, number of files, disk space consumption,
+ * and leakage.
+ *
+ * @throws Exception
+ */
+ public void testFsPseudoDistributed()
+ throws Exception {
+ while (testCycleNumber < TEST_PERMUTATION_MAX &&
+ testCycleNumber < TEST_PERMUTATION_MAX_CEILING) {
+ int blockSize = BLOCK_SIZES[testCycleNumber % BLOCK_SIZES.length];
+ int numFiles = FILE_COUNTS[testCycleNumber % FILE_COUNTS.length];
+ int fileSize = FILE_SIZES[testCycleNumber % FILE_SIZES.length];
+ prepareTempFileSpace();
+ testFsPseudoDistributed(fileSize, numFiles, blockSize,
+ (testCycleNumber % 2) + 2);
+ }
+ }
+
+ /**
+ * Pseudo Distributed FS Testing.
+ * Do one test cycle with given parameters.
+ *
+ * @param nBytes number of bytes to write to each file.
+ * @param numFiles number of files to create.
+ * @param blockSize block size to use for this test cycle.
+ * @param initialDNcount number of datanodes to create
+ * @throws Exception
+ */
+ public void testFsPseudoDistributed(long nBytes, int numFiles,
+ int blockSize, int initialDNcount)
+ throws Exception {
+ long startTime = System.currentTimeMillis();
+ int bufferSize = Math.min(BUFFER_SIZE, blockSize);
+ boolean checkDataDirsEmpty = false;
+ int iDatanodeClosed = 0;
+ Random randomDataGenerator = makeRandomDataGenerator();
+ final int currentTestCycleNumber = testCycleNumber;
+ msg("using randomDataGenerator=" +
randomDataGenerator.getClass().getName());
+
+ //
+ // modify config for test
+
+ //
+ // set given config param to override other config settings
+ NutchConf.get().setInt("test.ndfs.block_size", blockSize);
+ // verify that config changed
+ assertTrue(blockSize == NutchConf.get().getInt("test.ndfs.block_size",
2)); // 2 is an intentional obviously-wrong block size
+ // downsize for testing (just to save resources)
+ NutchConf.get().setInt("ndfs.namenode.handler.count", 3);
+ if (false) { // use MersenneTwister, if present
+ NutchConf.get().set("nutch.random.class",
+ "org.apache.nutch.util.MersenneTwister");
+ }
+ NutchConf.get().setLong("ndfs.blockreport.intervalMsec", 50*1000L);
+ NutchConf.get().setLong("ndfs.datanode.startupMsec", 15*1000L);
+
+ String nameFSDir = baseDirSpecified + "/name";
+ msg("----Start Test Cycle=" + currentTestCycleNumber +
+ " test.ndfs.block_size=" + blockSize +
+ " nBytes=" + nBytes +
+ " numFiles=" + numFiles +
+ " initialDNcount=" + initialDNcount);
+
+ //
+ // start a NameNode
+
+ int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port
+ String nameNodeSocketAddr = "localhost:" + nameNodePort;
+ NameNode nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort);
+ NDFSClient ndfsClient = null;
+ try {
+ //
+ // start some DataNodes
+ //
+ ArrayList listOfDataNodeDaemons = new ArrayList();
+ NutchConf conf = NutchConf.get();
+ conf.set("fs.default.name", nameNodeSocketAddr);
+ for (int i = 0; i < initialDNcount; i++) {
+ // uniquely config real fs path for data storage for this datanode
+ String dataDir = baseDirSpecified + "/datanode" + i;
+ conf.set("ndfs.data.dir", dataDir);
+ DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+ if (dn != null) {
+ listOfDataNodeDaemons.add(dn);
+ (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+ }
+ }
+ try {
+ assertTrue("insufficient datanodes for test to continue",
+ (listOfDataNodeDaemons.size() >= 2));
+
+ //
+ // wait for datanodes to report in
+ awaitQuiescence();
+
+ // act as if namenode is a remote process
+ ndfsClient = new NDFSClient(new InetSocketAddress("localhost",
nameNodePort));
+
+ //
+ // write nBytes of data using randomDataGenerator to numFiles
+ //
+ ArrayList testfilesList = new ArrayList();
+ byte[] buffer = new byte[bufferSize];
+ UTF8 testFileName = null;
+ for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) {
+ testFileName = new UTF8("/f" + iFileNumber);
+ testfilesList.add(testFileName);
+ NFSOutputStream nos = ndfsClient.create(testFileName, false);
+ try {
+ for (long nBytesWritten = 0L;
+ nBytesWritten < nBytes;
+ nBytesWritten += buffer.length) {
+ if ((nBytesWritten + buffer.length) > nBytes) {
+ // calculate byte count needed to exactly hit nBytes in length
+ // to keep randomDataGenerator in sync during the verify step
+ int pb = (int) (nBytes - nBytesWritten);
+ byte[] bufferPartial = new byte[pb];
+ randomDataGenerator.nextBytes(bufferPartial);
+ nos.write(bufferPartial);
+ } else {
+ randomDataGenerator.nextBytes(buffer);
+ nos.write(buffer);
+ }
+ }
+ } finally {
+ nos.flush();
+ nos.close();
+ }
+ }
+
+ //
+ // No need to wait for blocks to be replicated because replication
+ // is supposed to be complete when the file is closed.
+ //
+
+ //
+ // take one datanode down
+ iDatanodeClosed =
+ currentTestCycleNumber % listOfDataNodeDaemons.size();
+ DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed);
+ msg("shutdown datanode daemon " + iDatanodeClosed +
+ " dn=" + dn.data);
+ try {
+ dn.shutdown();
+ } catch (Exception e) {
+ msg("ignoring datanode shutdown exception=" + e);
+ }
+
+ //
+ // verify data against a "rewound" randomDataGenerator
+ // that all of the data is intact
+ long lastLong = randomDataGenerator.nextLong();
+ randomDataGenerator = makeRandomDataGenerator(); // restart (make new)
PRNG
+ ListIterator li = testfilesList.listIterator();
+ while (li.hasNext()) {
+ testFileName = (UTF8) li.next();
+ NFSInputStream nis = ndfsClient.open(testFileName);
+ byte[] bufferGolden = new byte[bufferSize];
+ int m = 42;
+ try {
+ while (m != -1) {
+ m = nis.read(buffer);
+ if (m == buffer.length) {
+ randomDataGenerator.nextBytes(bufferGolden);
+ assertBytesEqual(buffer, bufferGolden, buffer.length);
+ } else if (m > 0) {
+ byte[] bufferGoldenPartial = new byte[m];
+ randomDataGenerator.nextBytes(bufferGoldenPartial);
+ assertBytesEqual(buffer, bufferGoldenPartial,
bufferGoldenPartial.length);
+ }
+ }
+ } finally {
+ nis.close();
+ }
+ }
+ // verify last randomDataGenerator rand val to ensure last file length
was checked
+ long lastLongAgain = randomDataGenerator.nextLong();
+ assertEquals(lastLong, lastLongAgain);
+ msg("Finished validating all file contents");
+
+ //
+ // now delete all the created files
+ msg("Delete all random test files under NDFS via remaining datanodes");
+ li = testfilesList.listIterator();
+ while (li.hasNext()) {
+ testFileName = (UTF8) li.next();
+ assertTrue(ndfsClient.delete(testFileName));
+ }
+
+ //
+ // wait for delete to be propagated
+ // (unlike writing files, delete is lazy)
+ msg("Test thread sleeping while datanodes propagate delete...");
+ awaitQuiescence();
+ msg("Test thread awakens to verify file contents");
+
+ //
+ // check that the datanode's block directory is empty
+ // (except for datanode that had forced shutdown)
+ checkDataDirsEmpty = true; // do it during finally clause
+
+ } catch (AssertionFailedError afe) {
+ throw afe;
+ } catch (Throwable t) {
+ msg("Unexpected exception_b: " + t);
+ t.printStackTrace();
+ } finally {
+ //
+ // shut down datanode daemons (this takes advantage of being
same-process)
+ msg("begin shutdown of all datanode daemons for test cycle " +
+ currentTestCycleNumber);
+
+ for (int i = 0; i < listOfDataNodeDaemons.size(); i++) {
+ DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i);
+ if (i != iDatanodeClosed) {
+ try {
+ if (checkDataDirsEmpty) {
+ File dataDir = new File(dataNode.data.dirpath);
+ assertNoBlocks(dataDir);
+
+ }
+ dataNode.shutdown();
+ } catch (Exception e) {
+ msg("ignoring exception during (all) datanode shutdown, e=" + e);
+ }
+ }
+ }
+ }
+ msg("finished shutdown of all datanode daemons for test cycle " +
+ currentTestCycleNumber);
+ if (ndfsClient != null) {
+ try {
+ msg("close down subthreads of NDFSClient");
+ ndfsClient.close();
+ } catch (Exception ignored) { }
+ msg("finished close down of NDFSClient");
+ }
+ } catch (AssertionFailedError afe) {
+ throw afe;
+ } catch (Throwable t) {
+ msg("Unexpected exception_a: " + t);
+ t.printStackTrace();
+ } finally {
+ // shut down namenode daemon (this takes advantage of being same-process)
+ msg("begin shutdown of namenode daemon for test cycle " +
+ currentTestCycleNumber);
+ try {
+ nameNodeDaemon.stop();
+ } catch (Exception e) {
+ msg("ignoring namenode shutdown exception=" + e);
+ }
+ msg("finished shutdown of namenode daemon for test cycle " +
+ currentTestCycleNumber);
+ }
+ msg("test cycle " + currentTestCycleNumber + " elapsed time=" +
+ (System.currentTimeMillis() - startTime) / 1000. + "sec");
+ msg("threads still running (look for stragglers): ");
+ msg(summarizeThreadGroup());
+ }
+
+ private void assertNoBlocks(File datanodeDir) {
+ File datanodeDataDir = new File(datanodeDir, "data");
+ String[] blockFilenames =
+ datanodeDataDir.list(
+ new FilenameFilter() {
+ public boolean accept(File dir, String name){
+ return Block.isBlockFilename(new File(dir, name));}});
+ // if this fails, the delete did not propagate because either
+ // awaitQuiescence() returned before the disk images were removed
+ // or a real failure was detected.
+ assertTrue(" data dir not empty: " + datanodeDataDir,
+ blockFilenames.length==0);
+ }
+
+ /**
+ * Make a data generator.
+ * Allows optional use of high quality PRNG by setting property
+ * nutch.random.class to the full class path of a subclass of
+ * java.util.Random such as "...util.MersenneTwister".
+ * The property test.ndfs.random.seed can supply a seed for reproducible
+ * testing (a default is set here if property is not set.)
+ */
+ private Random makeRandomDataGenerator() {
+ long seed = NutchConf.get().getLong("test.ndfs.random.seed", 0xB437EF);
+ try {
+ if (randomDataGeneratorCtor == null) {
+ // lazy init
+ String rndDataGenClassname =
+ NutchConf.get().get("nutch.random.class", "java.util.Random");
+ Class clazz = Class.forName(rndDataGenClassname);
+ randomDataGeneratorCtor = clazz.getConstructor(new Class[]{Long.TYPE});
+ }
+
+ if (randomDataGeneratorCtor != null) {
+ Object arg[] = {new Long(seed)};
+ return (Random) randomDataGeneratorCtor.newInstance(arg);
+ }
+ } catch (ClassNotFoundException absorb) {
+ } catch (NoSuchMethodException absorb) {
+ } catch (SecurityException absorb) {
+ } catch (InstantiationException absorb) {
+ } catch (IllegalAccessException absorb) {
+ } catch (IllegalArgumentException absorb) {
+ } catch (InvocationTargetException absorb) {
+ }
+
+ // last resort
+ return new java.util.Random(seed);
+ }
+
+ /** Wait for the NDFS datanodes to become quiescent.
+ * The initial implementation is to sleep for some fixed amount of time,
+ * but a better implementation would be to really detect when distributed
+ * operations are completed.
+ * @throws InterruptedException
+ */
+ private void awaitQuiescence() throws InterruptedException {
+ // ToDo: Need observer pattern, not static sleep
+ // Doug suggested that the block report interval could be made shorter
+ // and then observing that would be a good way to know when an operation
+ // was complete (quiescence detect).
+ sleepAtLeast(60000);
+ }
+
+ private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) {
+ for (int i = 0; i < len; i++) {
+ assertEquals(buffer[i], bufferGolden[i]);
+ }
+ }
+
+ private void msg(String s) {
+ //System.out.println(s);
+ LOG.info(s);
+ }
+
+ public static void sleepAtLeast(int tmsec) {
+ long t0 = System.currentTimeMillis();
+ long t1 = t0;
+ long tslept = t1 - t0;
+ while (tmsec > tslept) {
+ try {
+ long tsleep = tmsec - tslept;
+ Thread.sleep(tsleep);
+ t1 = System.currentTimeMillis();
+ } catch (InterruptedException ie) {
+ t1 = System.currentTimeMillis();
+ }
+ tslept = t1 - t0;
+ }
+ }
+
+ public static String summarizeThreadGroup() {
+ int n = 10;
+ int k = 0;
+ Thread[] tarray = null;
+ StringBuffer sb = new StringBuffer(500);
+ do {
+ n = n * 10;
+ tarray = new Thread[n];
+ k = Thread.enumerate(tarray);
+ } while (k == n); // while array is too small...
+ for (int i = 0; i < k; i++) {
+ Thread thread = tarray[i];
+ sb.append(thread.toString());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ String usage = "Usage: TestNDFS (no args)";
+ if (args.length != 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+ String[] testargs = {"org.apache.nutch.ndfs.TestNDFS"};
+ junit.textui.TestRunner.main(testargs);
+ }
+
+}