Update of /cvsroot/nutch/nutch/src/java/net/nutch/tools
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2774/src/java/net/nutch/tools

Added Files:
        ParseSegment.java 
Log Message:
Introduce option -noParsing to fetcher and add ParseSegment.java.


--- NEW FILE: ParseSegment.java ---
/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.tools;

import net.nutch.pagedb.FetchListEntry;
import net.nutch.io.*;
import net.nutch.util.*;
import net.nutch.protocol.*;
import net.nutch.parse.*;
import net.nutch.plugin.*;

import net.nutch.fetcher.FetcherOutput;

import java.io.File;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import java.util.Properties;
import java.util.logging.*;

/**
 * Parse contents in one segment.
 *
 * <p>
 * It assumes, under given segment, existence of ./fetcher_output/,
 * which is typically generated after a non-parsing fetcher run
 * (i.e., fetcher is started with option -noParsing).
 *
 * <p> Contents in one segemnt are parsed and aved in these steps:
 * <li> (1) ./fetcher_output/ and ./content/ are looped together
 * (possibly by multiple ParserThreads), and content is parsed for each entry.
 * The entry number and resultant ParserOutput are saved in ./parser.unsorted.
 * <li> (2) ./parser.unsorted is sorted by entry number, result saved as
 * ./parser.sorted.
 * <li> (3) ./parser.sorted and ./fetcher_output/ are looped together.
 * At each entry, ParserOutput is split into ParseDate and ParseText,
 * which are saved in ./parse_data/ and ./parse_text/ respectively. Also
 * updated is FetcherOutput with parsing status, which is saved in ./fetcher/.
 *
 * <p> In the end, ./fetcher/ should be identical to one resulted from
 * fetcher run WITHOUT option -noParsing.
 *
 * <p> By default, intermediates ./parser.unsorted and ./parser.sorted
 * are removed at the end, unless option -noClean is used. However
 * ./fetcher_output/ is kept intact.
 *
 * <p> Check Fetcher.java and FetcherOutput.java for further discussion.
 *
 * @author John Xing
 */

public class ParseSegment {

  public static final Logger LOG =
    LogFormatter.getLogger(ParseSegment.class.getName());

  private int threadCount =                       // max number of threads
    NutchConf.getInt("parser.threads.parse", 10);

  private NutchFileSystem nfs;

  // segment dir
  private String directory;

  // readers for FetcherOutput (no-parsing) and Content
  private ArrayFile.Reader fetcherNPReader;
  private ArrayFile.Reader contentReader;

  // SequenceFile (unsorted) for ParserOutput
  private File unsortedFile;
  private SequenceFile.Writer parserOutputWriter;

  // SequenceFile (sorted) for ParserOutput
  private File sortedFile;

  // whether dryRun only (i.e., no real parsing is done)
  private boolean dryRun = false;

  // whether clean intermediate files
  private boolean clean = true;

  // entry (record number) in fetcherNPReader (same in contentReader)
  private long entry = -1;

  // for stats
  private long start;                             // start time
  private long bytes;                             // total bytes parsed
  private int pages;                              // total pages parsed
  private int errors;                             // total pages errored

  private ThreadGroup group = new ThreadGroup("parser"); // our thread group

  /**
   * Inner class ParserThread
   */
  private class ParserThread extends Thread {

    // current entry that this thread is parsing
    private long myEntry = -1;

    // for detailed stats
    private long t0,t1,t2,t3,t4,t5;

    public ParserThread() { super(group, "myThread"); }

    /**
     * This thread participates in looping through
     * entries of FetcherOutput and Content
     */
    public void run() {

      FetcherOutput fetcherOutput = new FetcherOutput();
      Content content = new Content();

      FetchListEntry fle = null;
      String url = null;

      while (true) {
        if (LogFormatter.hasLoggedSevere())       // something bad happened
          break;                                  // exit

        t0 = System.currentTimeMillis();

        try {

          // must be read in order! thus synchronize threads.
          synchronized (ParseSegment.this) {
            t1 = System.currentTimeMillis();

            if (fetcherNPReader.next(fetcherOutput) == null ||
                contentReader.next(content) == null)
              return;

            entry++;
            myEntry = entry;
            if (LOG.isLoggable(Level.FINE))
              LOG.fine("Read in entry "+entry);

            // safe guard against mismatched files
            //if (entry != fetcherNPReader.key() ||
            //    entry != contentReader.key()) {
            //  LOG.severe("Mismatched entries under "
            //    + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
            //  continue;
            //}
          }

          t2 = System.currentTimeMillis();

          fle = fetcherOutput.getFetchListEntry();
          url = fle.getPage().getURL().toString();

          LOG.info("parsing " + url);            // parse the page

          // safe guard against mismatched files
          if (!url.equals(content.getUrl())) {
            LOG.severe("Mismatched entries under "
              + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
            continue;
          }

          if (fetcherOutput.getStatus() == FetcherOutput.SUCCESS) {
            handleContent(url, content);
            synchronized (ParseSegment.this) {
              pages++;                    // record successful parse
              bytes += content.getContent().length;
              if ((pages % 100) == 0)
                status();
            }
          } else {
            // errored at fetch step
            logError(url, new ProtocolException("Error at fetch stage"));
            handleNoContent(ParserOutput.NOFETCH);
          }

        } catch (ParseException e) {
          logError(url, e);
          handleNoContent(ParserOutput.FAILURE);

        } catch (Throwable t) {                   // an unchecked exception
          if (fle != null) {
            logError(url, t);
            handleNoContent(ParserOutput.UNKNOWN);
          } else {
            LOG.severe("Unexpected exception");
          }
        }
      }
    }

    private void logError(String url, Throwable t) {
      LOG.info("parse of " + url + " failed with: " + t);
      if (LOG.isLoggable(Level.FINE))
        LOG.log(Level.FINE, "stack", t);               // stack trace
      synchronized (ParseSegment.this) {               // record failure
        errors++;
      }
    }

    private void handleContent(String url, Content content)
      throws ParseException {

      //String contentType = content.getContentType();
      String contentType = content.getMetadata().getProperty("Content-Type");

      if (ParseSegment.this.dryRun) {
        LOG.info("To be handled as Content-Type: "+contentType);
        return;
      }

      Parser parser = ParserFactory.getParser(contentType, url);
      Parse parse = parser.getParse(content);

      outputPage
        (new ParseText(parse.getText()), parse.getData(),ParserOutput.SUCCESS);
    }

    private void handleNoContent(int status) {
      if (ParseSegment.this.dryRun) {
        LOG.info("To be handled as no content");
        return;
      }
      outputPage(new ParseText(""),
                 new ParseData("", new Outlink[0], new Properties()),
                 status);
    }
      
    private void outputPage
      (ParseText parseText, ParseData parseData, int status) {
      try {
        t3 = System.currentTimeMillis();
        synchronized (parserOutputWriter) {
          t4 = System.currentTimeMillis();
          parserOutputWriter.append(new LongWritable(myEntry),
            new ParserOutput(parseData, parseText, status));
          t5 = System.currentTimeMillis();
          if (LOG.isLoggable(Level.FINE))
            LOG.fine("Entry: "+myEntry
              +" "+parseData.getMetadata().getProperty("Content-Length")
              +" wait="+(t1-t0) +" read="+(t2-t1) +" parse="+(t3-t2)
              +" wait="+(t4-t3) +" write="+(t5-t4) +"ms");
        }
      } catch (Throwable t) {
        LOG.severe("error writing output:" + t.toString());
      }
    }

  }

  /**
   * Inner class ParserOutput: ParseData + ParseText + status
   */
  private class ParserOutput extends VersionedWritable {
    public static final String DIR_NAME = "parser";

    private final static byte VERSION = 1;

    // could be more detailed
    public final static byte UNKNOWN = (byte)0; // unknown problem in parsing
    public final static byte SUCCESS = (byte)1; // parsing succeeded
    public final static byte FAILURE = (byte)2; // parsing failed
    public final static byte NOFETCH = (byte)3; // fetch was not a SUCCESS

    private int status;

    private ParseData parseData = new ParseData();
    private ParseText parseText = new ParseText();

    public ParserOutput() {}
    
    public ParserOutput(ParseData parseData, ParseText parseText, int status) {
      this.parseData = parseData;
      this.parseText = parseText;
      this.status = status;
    }

    public byte getVersion() { return VERSION; }

    public ParseData getParseData() {
      return this.parseData;
    }

    public ParseText getParseText() {
      return this.parseText;
    }

    public int getStatus() {
      return this.status;
    }

    public final void readFields(DataInput in) throws IOException {
      super.readFields(in);                         // check version
      status = in.readByte();
      parseData.readFields(in);
      parseText.readFields(in);
      return;
    }

    public final void write(DataOutput out) throws IOException {
      super.write(out);                             // write version
      out.writeByte(status);
      parseData.write(out);
      parseText.write(out);
      return;
    }
  }
                        
  /**
   * ParseSegment constructor
   */
  public ParseSegment(NutchFileSystem nfs, String directory, boolean dryRun)
    throws IOException {

    File file;

    this.nfs = nfs;
    this.directory = directory;
    this.dryRun = dryRun;

    // FetcherOutput.DIR_NAME_NP must exist
    file = new File(directory, FetcherOutput.DIR_NAME_NP);
    if (!nfs.exists(file))
      throw new IOException("Directory missing: "+FetcherOutput.DIR_NAME_NP);

    if (dryRun)
      return;

    // clean old FetcherOutput.DIR_NAME
    file = new File(directory, FetcherOutput.DIR_NAME);
    if (nfs.exists(file)) {
      LOG.info("Deleting old "+file.getName());
      nfs.delete(file);
    }

    // clean old unsortedFile
    this.unsortedFile = new File(directory, ParserOutput.DIR_NAME+".unsorted");
    if (nfs.exists(this.unsortedFile)) {
      LOG.info("Deleting old "+this.unsortedFile.getName());
      nfs.delete(this.unsortedFile);
    }

    // clean old sortedFile
    this.sortedFile = new File(directory, ParserOutput.DIR_NAME+".sorted");
    if (nfs.exists(this.sortedFile)) {
      LOG.info("Deleting old "+this.sortedFile.getName());
      nfs.delete(this.sortedFile);
    }

    // clean old ParseData.DIR_NAME
    file = new File(directory, ParseData.DIR_NAME);
    if (nfs.exists(file)) {
      LOG.info("Deleting old "+file.getName());
      nfs.delete(file);
    }

    // clean old ParseText.DIR_NAME
    file = new File(directory, ParseText.DIR_NAME);
    if (nfs.exists(file)) {
      LOG.info("Deleting old "+file.getName());
      nfs.delete(file);
    }

  }

  /** Set thread count */
  public void setThreadCount(int threadCount) {
    this.threadCount=threadCount;
  }

  /** Set the logging level. */
  public static void setLogLevel(Level level) {
    LOG.setLevel(level);
    PluginRepository.LOG.setLevel(level);
    ParserFactory.LOG.setLevel(level);
    LOG.info("logging at " + level);
  }

  /** Set if clean intermediates. */
  public void setClean(boolean clean) {
    this.clean = clean;
  }

  /** Display the status of the parser run. */
  public void status() {
    long ms = System.currentTimeMillis() - start;
    LOG.info("status: "
             + pages + " pages, "
             + errors + " errors, "
             + bytes + " bytes, "
             + ms + " ms");
    LOG.info("status: "
             + (((float)pages)/(ms/1000.0f))+" pages/s, "
             + (((float)bytes*8/1024)/(ms/1000.0f))+" kb/s, "
             + (((float)bytes)/pages) + " bytes/page");
  }

  /** Parse contents by multiple threads and save as unsorted ParserOutput */
  public void parse() throws IOException, InterruptedException {

    fetcherNPReader = new ArrayFile.Reader
      (nfs, (new File(directory, FetcherOutput.DIR_NAME_NP)).getPath());
    contentReader = new ArrayFile.Reader
      (nfs, (new File(directory, Content.DIR_NAME)).getPath());

    if (!this.dryRun) {
      parserOutputWriter = new SequenceFile.Writer
        (nfs, unsortedFile.getPath(), LongWritable.class, ParserOutput.class);
    }

    start = System.currentTimeMillis();

    for (int i = 0; i < threadCount; i++) {       // spawn threads
      ParserThread thread = new ParserThread(); 
      thread.start();
    }

    do {
      Thread.sleep(1000);

      if (LogFormatter.hasLoggedSevere()) 
        throw new RuntimeException("SEVERE error logged.  Exiting parser.");

    } while (group.activeCount() > 0);            // wait for threads to finish

    fetcherNPReader.close();
    contentReader.close();
    if (!this.dryRun)
      parserOutputWriter.close();

    status();                                     // print final status
  }

  /** Sort ParserOutput */
  public void sort() throws IOException {

    if (this.dryRun)
      return;

    LOG.info("Sorting ParserOutput");

    start = System.currentTimeMillis();

    SequenceFile.Sorter sorter = new SequenceFile.Sorter
      (nfs, new LongWritable.Comparator(), ParserOutput.class);

    sorter.sort(unsortedFile.getPath(), sortedFile.getPath());

    double localSecs = (System.currentTimeMillis() - start) / 1000.0;
    LOG.info("Sorted: " + (pages+errors) + " entries in " + localSecs + "s, "
      + ((pages+errors)/localSecs) + " entries/s");

    if (this.clean) {
      LOG.info("Deleting intermediate "+unsortedFile.getName());
      nfs.delete(unsortedFile);
    }

    return;
  }

  /**
   * Split sorted ParserOutput into ParseData and ParseText,
   * and generate new FetcherOutput with updated status
   */
  public void save() throws IOException {

    if (this.dryRun)
      return;

    LOG.info("Saving ParseData and ParseText separately");

    start = System.currentTimeMillis();

    SequenceFile.Reader parserOutputReader
      = new SequenceFile.Reader(nfs, sortedFile.getPath());

    ArrayFile.Reader fetcherNPReader = new ArrayFile.Reader(nfs,
      (new File(directory, FetcherOutput.DIR_NAME_NP)).getPath());

    ArrayFile.Writer fetcherWriter = new ArrayFile.Writer(nfs,
      (new File(directory, FetcherOutput.DIR_NAME)).getPath(),
      FetcherOutput.class);

    ArrayFile.Writer parseDataWriter = new ArrayFile.Writer(nfs,
      (new File(directory, ParseData.DIR_NAME)).getPath(), ParseData.class);
    ArrayFile.Writer parseTextWriter = new ArrayFile.Writer(nfs,
      (new File(directory, ParseText.DIR_NAME)).getPath(), ParseText.class);

    try {
      LongWritable key = new LongWritable();
      ParserOutput val = new ParserOutput();
      FetcherOutput fo = new FetcherOutput();
      int count = 0;
      int status;
      while (parserOutputReader.next(key,val)) {
        fetcherNPReader.next(fo);
        // safe guarding
        if (fetcherNPReader.key() != key.get())
          throw new IOException("Mismatch between entries under "
            + FetcherOutput.DIR_NAME_NP + " and in " + sortedFile.getName());
        // reset status in fo (FetcherOutput), using status in ParserOutput
        switch (val.getStatus()) {
        case ParserOutput.SUCCESS:
          fo.setStatus(FetcherOutput.SUCCESS);
          break;
        case ParserOutput.UNKNOWN:
        case ParserOutput.FAILURE:
          fo.setStatus(FetcherOutput.CANT_PARSE);
          break;
        case ParserOutput.NOFETCH:
        default:
          // do not reset
        }
        fetcherWriter.append(fo);
        parseDataWriter.append(val.getParseData());
        parseTextWriter.append(val.getParseText());
        count++;
      }
      // safe guard! make sure there are identical entries
      // in (fetcher, content) and in (parseData, parseText)
      if (count != (pages+errors))
        throw new IOException("Missing entries: expect "+(pages+errors)
          +", but have "+count+" entries instead.");
    } finally {
      fetcherNPReader.close();
      fetcherWriter.close();
      parseDataWriter.close();
      parseTextWriter.close();
      parserOutputReader.close();
    }

    double localSecs = (System.currentTimeMillis() - start) / 1000.0;
    LOG.info("Saved: " + (pages+errors) + " entries in " + localSecs + "s, "
      + ((pages+errors)/localSecs) + " entries/s");

    if (this.clean) {
      LOG.info("Deleting intermediate "+sortedFile.getName());
      nfs.delete(sortedFile);
    }

    return;
  }

  /** main method */
  public static void main(String[] args) throws Exception {
    int threadCount = -1;
    boolean showThreadID = false;
    boolean dryRun = false;
    String logLevel = "info";
    boolean clean = true;
    String directory = null;

    String usage = "Usage: ParseSegment (-local | -ndfs <namenode:port>) [-threads n] 
[-showThreadID] [-dryRun] [-logLevel level] [-noClean] dir";

    if (args.length == 0) {
      System.err.println(usage);
      System.exit(-1);
    }
      
    // parse command line
    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);

    for (int i = 0; i < args.length; i++) {
      if (args[i].equals("-threads")) {
        threadCount =  Integer.parseInt(args[++i]);
      } else if (args[i].equals("-showThreadID")) {
        showThreadID = true;
      } else if (args[i].equals("-dryRun")) {
        dryRun = true;
      } else if (args[i].equals("-logLevel")) {
        logLevel = args[++i];
      } else if (args[i].equals("-noClean")) {
        clean = false;
      } else if (i != args.length-1) {
        System.err.println(usage);
        System.exit(-1);
      } else {
        directory = args[i];
      }
    }

    try {

      ParseSegment parseSegment = new ParseSegment(nfs, directory, dryRun);

      parseSegment.setLogLevel
        (Level.parse((new String(logLevel)).toUpperCase()));

      if (threadCount != -1)
        parseSegment.setThreadCount(threadCount);
      if (showThreadID)
        LogFormatter.setShowThreadIDs(showThreadID);

      parseSegment.setClean(clean);

      parseSegment.parse();
      parseSegment.sort();
      parseSegment.save();

    } finally {
      nfs.close();
    }

  }
}



-------------------------------------------------------
This SF.net email is sponsored by: IT Product Guide on ITManagersJournal
Use IT products in your business? Tell us what you think of them. Give us
Your Opinions, Get Free ThinkGeek Gift Certificates! Click to find out more
http://productguide.itmanagersjournal.com/guidepromo.tmpl
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to