Update of /cvsroot/nutch/nutch/src/test/net/nutch/util
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv4633/src/test/net/nutch/util
Added Files:
TestNutchFS.java
Log Message:
Full commit for Nutch distributed WebDB.
This is a lot of new code that implements the multi-machine
web database. This means we should be able to update the db
with multiple CPUs and disks simultaneously. (This has been
a major bottleneck for us so far.)
This commit also contains files for the NutchFileSystem, which
is a rudimentary distributed file system. The Distributed WebDB
is built on top of NutchFS. There are two implementations of
NutchFS: one for machines mounting NFS (network file system), and
one for machines that need to use a remote SSL connection, The
former is well-tested, but the latter is still a little sketchy.
I've done what little testing I can do on my laptop. I'm putting
code back so that other people can take a look, and so we can put
it on multiple machines.
Note that I've put changes back to the files "DistributedWebDBWriter"
and "DistributedWebDBReader". These are meant to replace "WebDBWriter"
and "WebDBReader," but I didn't want to disturb the source base
until the distributed code is tested further.
--- NEW FILE: TestNutchFS.java ---
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.util;
import java.io.*;
import java.util.*;
import junit.framework.*;
/*************************************************
* Unit test for NutchFS classes.
*
* @author Mike Cafarella
*************************************************/
public class TestNutchFS extends TestCase {
static String CP_TEMPLATE = "/bin/cp %srcpath% %dstpath%";
static String RM_TEMPLATE = "/bin/rm %dstpath%";
static String MKDIR_TEMPLATE = "/bin/mkdir -p %dstpath%";
/**
* Create the TestCase.
*/
public TestNutchFS(String name) {
super(name);
}
/*****************************************************
* An instance of class Tester will perform a sequence of
* operations on the given db. All the Testers of a set
* must run before any can complete.
*****************************************************/
class Tester implements Runnable {
NutchFileSystem nutchfs;
int curPid = -1, numPids = -1;
IOException exception;
/**
* This Tester has ID 'curPid' of total 'numPids'.
*
*/
public Tester(NutchFileSystem nutchfs, int curPid, int numPids) {
this.nutchfs = nutchfs;
this.curPid = curPid;
this.numPids = numPids;
this.exception = null;
}
/**
* Perform a fairly elaborate test of a variety of features.
* Meant to operate when other processes are running.
*/
public void run() {
try {
// Emit a single file from this pid for each share group
System.out.println(" Worker " + curPid + ": writing");
for (int i = 0; i < numPids; i++) {
// Grab a new file and write a line of text to it
File foo = nutchfs.getWorkingFile();
BufferedWriter out = new BufferedWriter(new FileWriter(foo));
try {
out.write("HelloWorld\n");
} finally {
out.close();
}
// Put() it into the Nutch FS, under the share-group 'i'
NutchFile f = new NutchFile(nutchfs, "db", "share" + i, new
File("f" + curPid));
nutchfs.put(f, foo, true);
}
// Get all the files intended for this share group
System.out.println(" Worker " + curPid + ": finding");
for (int i = 0; i < numPids; i++) {
NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new
File("f" + i));
File foo = nutchfs.get(f);
// Check sequence from file
BufferedReader in = new BufferedReader(new FileReader(foo));
try {
String cur = in.readLine();
if (! "HelloWorld".equals(cur.trim())) {
System.err.println("ARRGH! Loaded-file says <" + cur +
">");
}
} finally {
in.close();
}
}
// Now rename all the files that this writer waited for
System.out.println(" Worker " + curPid + ": renaming");
for (int i = 0; i < numPids; i++) {
NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new
File("f" + i));
NutchFile fmoved = new NutchFile(nutchfs, "db", "share" + curPid,
new File("f_moved" + i));
nutchfs.renameTo(f, fmoved);
}
// Now delete all the files that this writer just renamed
System.out.println(" Worker " + curPid + ": deleting");
for (int i = 0; i < numPids; i++) {
NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new
File("f_moved" + i));
nutchfs.delete(f);
}
System.out.println(" Worker " + curPid + ": complete");
} catch (IOException ie) {
this.exception = ie;
}
}
/**
* Return stored failure, if any
*/
public IOException getFailure() {
return exception;
}
}
/**
* fullNFSTest() creates a set of worker threads to simulate
* multiple processes using a single NutchNFSFileSystem at the
* same time. The test ends when all workers are finished.
*
* The function requires a working directory and a number
* of workers to create.
*/
public void fullNFSTest(File dbRoot, int numWorkers) throws IOException {
if (dbRoot.exists()) {
throw new IOException("File " + dbRoot + " already exists.");
}
NutchFileSystem nutchfs[] = new NutchFileSystem[numWorkers];
Tester testers[] = new Tester[numWorkers];
for (int i = 0; i < testers.length; i++) {
testers[i] = new Tester(new NutchNFSFileSystem(dbRoot, true), i,
numWorkers);
}
// Close the filesystem objects when we're done here
try {
fullTest(testers);
} finally {
for (int i = 0; i < nutchfs.length; i++) {
try {
nutchfs[i].close();
} catch (IOException ie) {
}
}
}
}
/**
* fullRFSTest() creates a set of worker threads to simulate
* multiple different machines all using RemoteFileSystems.
*/
public void fullRFSTest(File dir, int numWorkers) throws IOException {
if (dir.exists()) {
throw new IOException("File " + dir + " already exists.");
}
//
// Create a ShareSet that tells where everything is.
//
//
// Create list of Tester objects, pass in a NutchRemoteFileSystem
//
NutchFileSystem nutchfs[] = new NutchFileSystem[numWorkers];
Tester testers[] = new Tester[numWorkers];
Vector shareGroups = new Vector();
for (int i = 0; i < testers.length; i++) {
StringBuffer locationDesc = new StringBuffer();
for (int j = 0; j < testers.length; j++) {
File dbRoot = new File(dir, "system" + j);
locationDesc.append(dbRoot.getPath());
if (j+1 < testers.length) {
locationDesc.append(";");
}
}
shareGroups.add(new ShareGroup("share" + i, locationDesc.toString()));
}
for (int i = 0; i < testers.length; i++) {
File dbRoot = new File(dir, "system" + i);
nutchfs[i] = new NutchRemoteFileSystem(dbRoot, new ShareSet(dbRoot,
shareGroups), CP_TEMPLATE, RM_TEMPLATE, MKDIR_TEMPLATE);
testers[i] = new Tester(nutchfs[i], i, numWorkers);
}
//
// Close the filesystem objects when we're done here.
//
int numExceptions = 0;
try {
numExceptions = fullTest(testers);
} finally {
for (int i = 0; i < nutchfs.length; i++) {
try {
nutchfs[i].close();
} catch (IOException ie) {
}
}
}
if (numExceptions == 0) {
FileUtil.fullyDelete(dir);
}
}
/**
* Start all the threads and wait for the given set of
* Tester objects to complete execution.
*/
int fullTest(Tester testers[]) {
int numWorkers = testers.length;
Thread workers[] = new Thread[numWorkers];
// Kick off a thread per worker
System.out.println("Launching " + numWorkers + " nutchFS clients");
for (int i = 0; i < workers.length; i++) {
workers[i] = new Thread(testers[i]);
workers[i].start();
}
// Wait for the threads to finish
System.out.println("Waiting for workers to complete...");
for (int i = 0; i < workers.length; i++) {
try {
workers[i].join();
} catch (InterruptedException ie) {
System.out.println("Received InterruptedException when waiting for
worker " + i + ". Aborting...");
return -1;
}
}
System.out.println();
System.out.println("All workers complete");
System.out.println();
// Check if any emitted exceptions
int numExceptions = 0;
for (int i = 0; i < testers.length; i++) {
if (testers[i].getFailure() != null) {
System.out.println("Worker " + i + " reported exception " +
testers[i].getFailure());
testers[i].getFailure().printStackTrace();
numExceptions++;
}
}
return numExceptions;
}
/**
* singleNFSTest() is used if you want to run a real test
* across many boxes, and you can't just use the standard
* thread-creator in fullNFSTest().
*/
public void singleNFSTest(File dbRoot, int curPid, int numPids) throws IOException
{
NutchFileSystem nutchfs = new NutchNFSFileSystem(dbRoot, true);
try {
Tester t = new Tester(nutchfs, curPid, numPids);
System.out.println("Launching test for processor " + curPid + " (of " +
numPids + ")");
t.run();
System.out.println("Processor " + curPid + " complete.");
} finally {
nutchfs.close();
}
}
/**
* singleRFSTest() is used to actually test the RemoteFileSystem
* across many boxes. So, you need to create each instance
* separately.
*/
public void singleRFSTest(File dir, int curPid, int numPids) throws IOException {
//
// Create sharegroups
//
Vector shareGroups = new Vector();
for (int i = 0; i < numPids; i++) {
StringBuffer locationDesc = new StringBuffer();
for (int j = 0; j < numPids; j++) {
File dbRoot = new File(dir, "system" + j);
locationDesc.append(dbRoot.getPath());
if (j+1 < numPids) {
locationDesc.append(";");
}
}
shareGroups.add(new ShareGroup("share" + i, locationDesc.toString()));
}
File dbRoot = new File(dir, "system" + curPid);
// Create nutch remote file system
NutchFileSystem nutchfs = new NutchRemoteFileSystem(dbRoot, new
ShareSet(dbRoot, shareGroups), CP_TEMPLATE, RM_TEMPLATE, MKDIR_TEMPLATE);
try {
Tester t = new Tester(nutchfs, curPid, numPids);
System.out.println("Launching test for processor " + curPid + " (of " +
numPids + ")");
t.run();
System.out.println("Processor " + curPid + " complete.");
} finally {
nutchfs.close();
}
}
/**
* This is the generic test case that Junit will use
* for testing the NutchNFSFileSystem.
* It runs a single-box simulation of 16 processors.
*/
public void testNutchNFS() throws IOException {
File tmpTest = File.createTempFile("testnutchnfs", "tmp");
tmpTest.delete();
fullNFSTest(tmpTest, 16);
FileUtil.fullyDelete(tmpTest);
}
/**
* This is the generic test case that JUnit will use
* for testing the NutchRemoteFileSystem.
* It runs a test of 4 simulated machines.
*/
public void testNutchRFS() throws IOException {
File tmpTest = File.createTempFile("testnutchrfs", "tmp");
tmpTest.delete();
fullRFSTest(tmpTest, 4);
}
/**
*/
public static void main(String argv[]) throws IOException {
if (argv.length < 2) {
System.out.println("Usage: java net.nutch.util.TestNutchFS (-fulltest
nfs|rfs <db> <numPids>) (-singletest nfs|rfs <db> <pid> <numPids>) (-defaulttest
nfs|rfs)");
return;
}
String dbName = argv[0], testType = null;
boolean fulltest = false, singletest = false, defaulttest = false;
int curPid = 0, numPids = 0;
for (int i = 0; i < argv.length; i++) {
if ("-fulltest".equals(argv[i])) {
fulltest = true;
testType = argv[i+1];
dbName = argv[i+2];
numPids = Integer.parseInt(argv[i+3]);
i+=3;
} else if ("-singletest".equals(argv[i])) {
singletest = true;
testType = argv[i+1];
dbName = argv[i+2];
curPid = Integer.parseInt(argv[i+3]);
numPids = Integer.parseInt(argv[i+4]);
i+=4;
} else if ("-defaulttest".equals(argv[i])) {
defaulttest = true;
testType = argv[i+1];
i++;
}
}
TestNutchFS tnf = new TestNutchFS("testNutchFS");
File dbRoot = new File(dbName);
if (fulltest) {
if ("nfs".equals(testType)) {
tnf.fullNFSTest(dbRoot, numPids);
} else if ("rfs".equals(testType)) {
tnf.fullRFSTest(dbRoot, numPids);
} else {
System.out.println("Sorry, no fulltest type called: " + testType);
}
} else if (singletest) {
if ("nfs".equals(testType)) {
tnf.singleNFSTest(dbRoot, curPid, numPids);
} else if ("rfs".equals(testType)) {
tnf.singleRFSTest(dbRoot, curPid, numPids);
} else {
System.out.println("Sorry, no singletest type called: " + testType);
}
} else if (defaulttest) {
if ("nfs".equals(testType)) {
tnf.testNutchNFS();
} else if ("rfs".equals(testType)) {
tnf.testNutchRFS();
} else {
System.out.println("Sorry, no defaulttest type called: " + testType);
}
} else {
System.out.println("No test selected");
}
}
}
-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs