Update of /cvsroot/nutch/nutch/src/test/net/nutch/util
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv4633/src/test/net/nutch/util

Added Files:
        TestNutchFS.java 
Log Message:

  Full commit for Nutch distributed WebDB.

  This is a lot of new code that implements the multi-machine
web database.  This means we should be able to update the db
with multiple CPUs and disks simultaneously.  (This has been
a major bottleneck for us so far.)

  This commit also contains files for the NutchFileSystem, which
is a rudimentary distributed file system.  The Distributed WebDB
is built on top of NutchFS.  There are two implementations of
NutchFS: one for machines mounting NFS (network file system), and
one for machines that need to use a remote SSL connection,  The
former is well-tested, but the latter is still a little sketchy.

  I've done what little testing I can do on my laptop.  I'm putting
code back so that other people can take a look, and so we can put
it on multiple machines.

  Note that I've put changes back to the files "DistributedWebDBWriter"
and "DistributedWebDBReader".  These are meant to replace "WebDBWriter" 
and "WebDBReader," but I didn't want to disturb the source base
until the distributed code is tested further.   



--- NEW FILE: TestNutchFS.java ---
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.util;

import java.io.*;
import java.util.*;
import junit.framework.*;

/*************************************************
 * Unit test for NutchFS classes.
 *
 * @author Mike Cafarella
 *************************************************/
public class TestNutchFS extends TestCase {
    static String CP_TEMPLATE = "/bin/cp %srcpath% %dstpath%";
    static String RM_TEMPLATE = "/bin/rm %dstpath%";
    static String MKDIR_TEMPLATE = "/bin/mkdir -p %dstpath%";

    /**
     * Create the TestCase.
     */
    public TestNutchFS(String name) {
        super(name);
    }

    /*****************************************************
     * An instance of class Tester will perform a sequence of 
     * operations on the given db.  All the Testers of a set 
     * must run before any can complete.
     *****************************************************/
    class Tester implements Runnable {
        NutchFileSystem nutchfs;
        int curPid = -1, numPids = -1;
        IOException exception;

        /**
         * This Tester has ID 'curPid' of total 'numPids'.
         * 
         */
        public Tester(NutchFileSystem nutchfs, int curPid, int numPids) {
            this.nutchfs = nutchfs;
            this.curPid = curPid;
            this.numPids = numPids;
            this.exception = null;
        }

        /**
         * Perform a fairly elaborate test of a variety of features.
         * Meant to operate when other processes are running.
         */
        public void run() {
            try {
                // Emit a single file from this pid for each share group
                System.out.println("  Worker " + curPid + ": writing");
                for (int i = 0; i < numPids; i++) {
                    // Grab a new file and write a line of text to it
                    File foo = nutchfs.getWorkingFile();
                    BufferedWriter out = new BufferedWriter(new FileWriter(foo));
                    try {
                        out.write("HelloWorld\n");
                    } finally {
                        out.close();
                    }

                    // Put() it into the Nutch FS, under the share-group 'i'
                    NutchFile f = new NutchFile(nutchfs, "db", "share" + i, new 
File("f" + curPid));
                    nutchfs.put(f, foo, true);
                }

                // Get all the files intended for this share group
                System.out.println("  Worker " + curPid + ": finding");
                for (int i = 0; i < numPids; i++) {
                    NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new 
File("f" + i));
                    File foo = nutchfs.get(f);

                    // Check sequence from file
                    BufferedReader in = new BufferedReader(new FileReader(foo));
                    try {
                        String cur = in.readLine();
                        if (! "HelloWorld".equals(cur.trim())) {
                            System.err.println("ARRGH!  Loaded-file says <" + cur + 
">");
                        }
                    } finally {
                        in.close();
                    }
                }

                // Now rename all the files that this writer waited for
                System.out.println("  Worker " + curPid + ": renaming");
                for (int i = 0; i < numPids; i++) {
                    NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new 
File("f" + i));
                    NutchFile fmoved = new NutchFile(nutchfs, "db", "share" + curPid, 
new File("f_moved" + i));
                    nutchfs.renameTo(f, fmoved);
                }

                // Now delete all the files that this writer just renamed
                System.out.println("  Worker " + curPid + ": deleting");
                for (int i = 0; i < numPids; i++) {
                    NutchFile f = new NutchFile(nutchfs, "db", "share" + curPid, new 
File("f_moved" + i));
                    nutchfs.delete(f);
                }

                System.out.println("  Worker " + curPid + ": complete");
            } catch (IOException ie) {
                this.exception = ie;
            }
        }

        /**
         * Return stored failure, if any
         */
        public IOException getFailure() {
            return exception;
        }
    }

    /**
     * fullNFSTest() creates a set of worker threads to simulate
     * multiple processes using a single NutchNFSFileSystem at the
     * same time.  The test ends when all workers are finished.
     *
     * The function requires a working directory and a number
     * of workers to create.
     */
    public void fullNFSTest(File dbRoot, int numWorkers) throws IOException {
        if (dbRoot.exists()) {
            throw new IOException("File " + dbRoot + " already exists.");
        }

        NutchFileSystem nutchfs[] = new NutchFileSystem[numWorkers];
        Tester testers[] = new Tester[numWorkers];
        for (int i = 0; i < testers.length; i++) {
            testers[i] = new Tester(new NutchNFSFileSystem(dbRoot, true), i, 
numWorkers);
        }

        // Close the filesystem objects when we're done here
        try {
            fullTest(testers);
        } finally {
            for (int i = 0; i < nutchfs.length; i++) {
                try {
                    nutchfs[i].close();
                } catch (IOException ie) {
                }
            }
        }
    }

    /**
     * fullRFSTest() creates a set of worker threads to simulate
     * multiple different machines all using RemoteFileSystems.
     */
    public void fullRFSTest(File dir, int numWorkers) throws IOException {
        if (dir.exists()) {
            throw new IOException("File " + dir + " already exists.");
        }
        //
        // Create a ShareSet that tells where everything is.
        //

        //
        // Create list of Tester objects, pass in a NutchRemoteFileSystem
        //
        NutchFileSystem nutchfs[] = new NutchFileSystem[numWorkers];
        Tester testers[] = new Tester[numWorkers];
        Vector shareGroups = new Vector();
        for (int i = 0; i < testers.length; i++) {
            StringBuffer locationDesc = new StringBuffer();
            for (int j = 0; j < testers.length; j++) {
                File dbRoot = new File(dir, "system" + j);
                locationDesc.append(dbRoot.getPath());
                if (j+1 < testers.length) {
                    locationDesc.append(";");
                }
            }
            shareGroups.add(new ShareGroup("share" + i, locationDesc.toString()));
        }

        for (int i = 0; i < testers.length; i++) {
            File dbRoot = new File(dir, "system" + i);
            nutchfs[i] = new NutchRemoteFileSystem(dbRoot, new ShareSet(dbRoot, 
shareGroups), CP_TEMPLATE, RM_TEMPLATE, MKDIR_TEMPLATE);
            testers[i] = new Tester(nutchfs[i], i, numWorkers);
        }

        //
        // Close the filesystem objects when we're done here.
        //
        int numExceptions = 0;
        try {
            numExceptions = fullTest(testers);
        } finally {
            for (int i = 0; i < nutchfs.length; i++) {
                try {
                    nutchfs[i].close();
                } catch (IOException ie) {
                }
            }
        }
        if (numExceptions == 0) {
            FileUtil.fullyDelete(dir);
        }
    }

    /**
     * Start all the threads and wait for the given set of
     * Tester objects to complete execution.
     */
    int fullTest(Tester testers[]) {
        int numWorkers = testers.length;
        Thread workers[] = new Thread[numWorkers];

        // Kick off a thread per worker
        System.out.println("Launching " + numWorkers + " nutchFS clients");
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Thread(testers[i]);
            workers[i].start();
        }

        // Wait for the threads to finish
        System.out.println("Waiting for workers to complete...");
        for (int i = 0; i < workers.length; i++) {
            try {
                workers[i].join();
            } catch (InterruptedException ie) {
                System.out.println("Received InterruptedException when waiting for 
worker " + i + ".  Aborting...");
                return -1;
            }
        }
        System.out.println();
        System.out.println("All workers complete");
        System.out.println();

        // Check if any emitted exceptions
        int numExceptions = 0;
        for (int i = 0; i < testers.length; i++) {
            if (testers[i].getFailure() != null) {
                System.out.println("Worker " + i + " reported exception " + 
testers[i].getFailure());
                testers[i].getFailure().printStackTrace();
                numExceptions++;
            }
        }

        return numExceptions;
    }

    /**
     * singleNFSTest() is used if you want to run a real test
     * across many boxes, and you can't just use the standard
     * thread-creator in fullNFSTest().
     */
    public void singleNFSTest(File dbRoot, int curPid, int numPids) throws IOException 
{
        NutchFileSystem nutchfs = new NutchNFSFileSystem(dbRoot, true);
        try {
            Tester t = new Tester(nutchfs, curPid, numPids);
            System.out.println("Launching test for processor " + curPid + " (of " + 
numPids + ")");
            t.run();
            System.out.println("Processor " + curPid + " complete.");
        } finally {
            nutchfs.close();
        }
    }

    /**
     * singleRFSTest() is used to actually test the RemoteFileSystem
     * across many boxes.  So, you need to create each instance
     * separately.
     */
    public void singleRFSTest(File dir, int curPid, int numPids) throws IOException {
        //
        // Create sharegroups
        //
        Vector shareGroups = new Vector();
        for (int i = 0; i < numPids; i++) {
            StringBuffer locationDesc = new StringBuffer();
            for (int j = 0; j < numPids; j++) {
                File dbRoot = new File(dir, "system" + j);
                locationDesc.append(dbRoot.getPath());
                if (j+1 < numPids) {
                    locationDesc.append(";");
                }
            }
            shareGroups.add(new ShareGroup("share" + i, locationDesc.toString()));
        }
        
        File dbRoot = new File(dir, "system" + curPid);

        // Create nutch remote file system
        NutchFileSystem nutchfs = new NutchRemoteFileSystem(dbRoot, new 
ShareSet(dbRoot, shareGroups), CP_TEMPLATE, RM_TEMPLATE, MKDIR_TEMPLATE);
        try {
            Tester t = new Tester(nutchfs, curPid, numPids);
            System.out.println("Launching test for processor " + curPid + " (of " + 
numPids + ")");
            t.run();
            System.out.println("Processor " + curPid + " complete.");        
        } finally {
            nutchfs.close();
        }
    }

    /**
     * This is the generic test case that Junit will use
     * for testing the NutchNFSFileSystem.
     * It runs a single-box simulation of 16 processors.
     */
    public void testNutchNFS() throws IOException {
        File tmpTest = File.createTempFile("testnutchnfs", "tmp");
        tmpTest.delete();
        fullNFSTest(tmpTest, 16);
        FileUtil.fullyDelete(tmpTest);
    }

    /**
     * This is the generic test case that JUnit will use
     * for testing the NutchRemoteFileSystem.
     * It runs a test of 4 simulated machines.
     */
    public void testNutchRFS() throws IOException {
        File tmpTest = File.createTempFile("testnutchrfs", "tmp");
        tmpTest.delete();
        fullRFSTest(tmpTest, 4);
    }

    /**
     */
    public static void main(String argv[]) throws IOException {
        if (argv.length < 2) {
            System.out.println("Usage: java net.nutch.util.TestNutchFS (-fulltest 
nfs|rfs <db> <numPids>) (-singletest nfs|rfs <db> <pid> <numPids>) (-defaulttest 
nfs|rfs)");
            return;
        }

        String dbName = argv[0], testType = null;
        boolean fulltest = false, singletest = false, defaulttest = false;
        int curPid = 0, numPids = 0;
        for (int i = 0; i < argv.length; i++) {
            if ("-fulltest".equals(argv[i])) {
                fulltest = true;
                testType = argv[i+1];
                dbName = argv[i+2];
                numPids = Integer.parseInt(argv[i+3]);
                i+=3;
            } else if ("-singletest".equals(argv[i])) {
                singletest = true;
                testType = argv[i+1];
                dbName = argv[i+2];                
                curPid = Integer.parseInt(argv[i+3]);
                numPids = Integer.parseInt(argv[i+4]);
                i+=4;
            } else if ("-defaulttest".equals(argv[i])) {
                defaulttest = true;
                testType = argv[i+1];
                i++;
            }
        }

        TestNutchFS tnf = new TestNutchFS("testNutchFS");
        File dbRoot = new File(dbName);
        if (fulltest) {
            if ("nfs".equals(testType)) {
                tnf.fullNFSTest(dbRoot, numPids);
            } else if ("rfs".equals(testType)) {
                tnf.fullRFSTest(dbRoot, numPids);
            } else {
                System.out.println("Sorry, no fulltest type called: " + testType);
            }
        } else if (singletest) {
            if ("nfs".equals(testType)) {
                tnf.singleNFSTest(dbRoot, curPid, numPids);
            } else if ("rfs".equals(testType)) {
                tnf.singleRFSTest(dbRoot, curPid, numPids);
            } else {
                System.out.println("Sorry, no singletest type called: " + testType);
            }
        } else if (defaulttest) {
            if ("nfs".equals(testType)) {
                tnf.testNutchNFS();
            } else if ("rfs".equals(testType)) {
                tnf.testNutchRFS();
            } else {
                System.out.println("Sorry, no defaulttest type called: " + testType);
            }
        } else {
            System.out.println("No test selected");
        }
    }
}



-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to