Update of /cvsroot/nutch/nutch/src/java/net/nutch/segment
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv13664

Added Files:
        SegmentReader.java SegmentSlicer.java SegmentWriter.java 
Log Message:
Add a high-level API for working with segment data:

* SegmentReader: read and optionally fix segment data. Report or dump
  segment content. This class is a superset of the DumpSegment tool.

* SegmentWriter: high-level API for writing segment data.

* SegmentSlicer: high-level API for copying, appending and slicing
  segment data.


--- NEW FILE: SegmentSlicer.java ---
/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.segment;

import java.io.File;
import java.io.FileFilter;
import java.util.Vector;
import java.util.logging.Logger;

import net.nutch.fetcher.FetcherOutput;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.util.LogFormatter;
import net.nutch.util.NutchFileSystem;

/**
 * This class reads data from one or more input segments, and outputs it to one
 * or more output segments, optionally deleting the input segments when it's
 * finished.
 * 
 * <p>Data is read sequentially from input segments, and appended to output
 * segment until it reaches the target count of entries, at which point the next
 * output segment is created, and so on.</p>
 * <p>NOTE 1: this tool does NOT de-duplicate data - use SegmentMergeTool for 
that.</p>
 * <p>NOTE 2: this tool does NOT copy indexes. It is currently impossible to 
slice
 * Lucene indexes. The proper procedure is first to create slices, and then to 
index
 * them.</p>
 * @author Andrzej Bialecki &lt;[EMAIL PROTECTED]&gt;
 */
public class SegmentSlicer implements Runnable {
  public static final Logger LOG = 
LogFormatter.getLogger("net.nutch.segment.SegmentSlicer");
  public static int LOG_STEP = 20000;
  
  private NutchFileSystem nfs = null;
  private File[] input = null;
  private File output = null;
  private boolean withContent = true;
  private boolean withParseData = true;
  private boolean withParseText = true;
  private boolean autoFix = false;
  private long maxCount = Long.MAX_VALUE;
  
  /**
   * Create new SegmentSlicer.
   * @param nfs filesystem
   * @param input list of input segments
   * @param output output directory, created if not exists. Output segments
   * will be created inside this directory
   * @param withContent if true, read content, otherwise ignore it
   * @param withParseText if true, read parse_text, otherwise ignore it
   * @param withParseData if true, read parse_data, otherwise ignore it
   * @param autoFix if true, attempt to fix corrupt segments
   * @param maxCount if greater than 0, determines the maximum number of entries
   * per output segment. New multiple output segments will be created as needed.
   */
  public SegmentSlicer(NutchFileSystem nfs, File[] input, File output,
          boolean withContent, boolean withParseText, boolean withParseData,
          boolean autoFix, long maxCount) {
    this.nfs = nfs;
    this.input = input;
    this.output = output;
    this.withContent = withContent;
    this.withParseData = withParseData;
    this.withParseText = withParseText;
    this.autoFix = autoFix;
    if (maxCount > 0) this.maxCount = maxCount;
  }

  /** Run the slicer. */
  public void run() {
    long start = System.currentTimeMillis();
    Vector readers = new Vector();
    long total = 0L;
    for (int i = 0; i < input.length; i++) {
      SegmentReader sr = null;
      try {
        sr = new SegmentReader(nfs, input[i], withContent, withParseText, 
withParseData, autoFix);
      } catch (Exception e) {
        LOG.warning(e.getMessage());
        continue;
      }
      total += sr.size;
      readers.add(sr);
    }
    LOG.info("Input: " + total + " entries in " + readers.size() + " 
segments.");
    FetcherOutput fo = new FetcherOutput();
    Content co = new Content();
    ParseData pd = new ParseData();
    ParseText pt = new ParseText();
    long outputCnt = 0L;
    int segCnt = 1;
    File outDir = new File(output, SegmentWriter.getNewSegmentName());
    LOG.info("Writing output in " + output);
    try {
      LOG.info(" - starting first output segment in " + outDir.getName());
      SegmentWriter sw = new SegmentWriter(nfs,
            outDir, true, withContent, withParseText, withParseData);
      long delta = System.currentTimeMillis();
      for (int i = 0; i < readers.size(); i++) {
        SegmentReader sr = (SegmentReader)readers.get(i);
        for (long k = 0L; k < sr.size; k++) {
          try {
            if (!sr.next(fo, co, pt, pd)) break;
          } catch (Throwable t) {
            LOG.warning(" - error reading entry #" + k + " from " + 
sr.segmentDir.getName());
            break;
          }
          sw.append(fo, co, pt, pd);
          outputCnt++;
          if (outputCnt % LOG_STEP == 0) {
            LOG.info(" Processed " + outputCnt + " entries (" +
                    (float)LOG_STEP / (float)(System.currentTimeMillis() - 
delta) * 1000.0f + " rec/s)");
            delta = System.currentTimeMillis();
          }
          if (outputCnt % maxCount == 0) {
            sw.close();
            outDir = new File(output, SegmentWriter.getNewSegmentName());
            segCnt++;
            LOG.info(" - starting next output segment in " + outDir.getName());
            sw = new SegmentWriter(nfs, outDir,
                    true, withContent, withParseText, withParseData);
          }
        }
        sr.close();
      }
      sw.close();
      delta = System.currentTimeMillis() - start;
      float eps = (float) outputCnt / (float) (delta / 1000);
      LOG.info("DONE segment slicing, INPUT: " + total + " -> OUTPUT: " + 
outputCnt + " entries in "
              + segCnt + " segment(s), " + ((float) delta / 1000f) + " s (" + 
eps + " entries/sec).");
    } catch (Throwable t) {
      t.printStackTrace();
      LOG.info("Unexpected error " + t.getMessage() + ", aborting at " + 
outputCnt + " output entries.");
    }
  }
  
  /** Command-line wrapper. Run without arguments to see usage help. */
  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      usage();
      return;
    }
    String segDir = null;
    String outDir = null;
    Vector dirs = new Vector();
    boolean fix = false;
    long maxCount = Long.MAX_VALUE;
    boolean withParseText = true;
    boolean withParseData = true;
    boolean withContent = true;
    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
    for (int i = 0; i < args.length; i++) {
      if (args[i] != null) {
        if (args[i].equals("-noparsetext")) withParseText = false;
        else if (args[i].equals("-noparsedata")) withParseData = false;
        else if (args[i].equals("-nocontent")) withContent = false;
        else if (args[i].equals("-fix")) fix = true;
        else if (args[i].equals("-dir")) segDir = args[++i];
        else if (args[i].equals("-o")) outDir = args[++i];
        else if (args[i].equals("-max")) {
          String cnt = args[++i];
          try {
            maxCount = Long.parseLong(cnt);
          } catch (Exception e) {
            LOG.warning("Invalid count '" + cnt + "', setting to 
Long.MAX_VALUE.");
          }
        } else dirs.add(new File(args[i]));
      }
    }
    if (outDir == null) {
      LOG.severe("Missing output path.");
      usage();
      return;
    }
    if (segDir != null) {
      File sDir = new File(segDir);
      if (!sDir.exists() || !sDir.isDirectory()) {
        LOG.warning("Invalid path: " + sDir);
      } else {
        File[] files = sDir.listFiles(new FileFilter() {
          public boolean accept(File f) {
            return f.isDirectory();
          }
        });
        if (files != null && files.length > 0) {
          for (int i = 0; i < files.length; i++) dirs.add(files[i]);
        }
      }
    }
    if (dirs.size() == 0) {
      LOG.severe("No input segment dirs.");
      usage();
      return;
    }
    File[] input = (File[])dirs.toArray(new File[0]);
    File output = new File(outDir);
    SegmentSlicer slicer = new SegmentSlicer(nfs, input, output,
            withContent, withParseText, withParseData, fix, maxCount);
    slicer.run();
  }

  private static void usage() {
    System.err.println("SegmentSlicer -o outputDir [-max count] [-fix] 
[-nocontent] [-noparsedata] [-noparsetext] (-dir segments | seg1 seg2 ...)");
    System.err.println("\tNOTE: at least one segment dir name is required, or 
'-dir' option.");
    System.err.println("\t      outputDir is always required.");
    System.err.println("\t-o outputDir\toutput directory for segments");
    System.err.println("\t-max count\t(optional) output multiple segments, each 
with maximum 'count' entries");
    System.err.println("\t-fix\t\t(optional) automatically fix corrupted 
segments");
    System.err.println("\t-nocontent\t(optional) ignore content data");
    System.err.println("\t-noparsedata\t(optional) ignore parse_data data");
    System.err.println("\t-nocontent\t(optional) ignore parse_text data");
    System.err.println("\t-dir segments\tdirectory containing multiple 
segments");
    System.err.println("\tseg1 seg2 ...\tsegment directories\n");
  }
}

--- NEW FILE: SegmentWriter.java ---
/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.segment;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Logger;

import net.nutch.fetcher.FetcherOutput;
import net.nutch.io.ArrayFile;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.util.LocalFileSystem;
import net.nutch.util.LogFormatter;
import net.nutch.util.NutchFileSystem;

/**
 * This class holds together all data writers for a new segment.
 * Some convenience methods are also provided, to append to the segment.
 * 
 * @author Andrzej Bialecki &lt;[EMAIL PROTECTED]&gt;
 */
public class SegmentWriter {
  public static final Logger LOG = 
LogFormatter.getLogger("net.nutch.segment.SegmentWriter");
  
  public ArrayFile.Writer fetcherWriter;
  public ArrayFile.Writer contentWriter;
  public ArrayFile.Writer parseTextWriter;
  public ArrayFile.Writer parseDataWriter;

  public long size = 0L;
  
  public File segmentDir;

  public SegmentWriter(File dir, boolean force) throws Exception {
    this(new LocalFileSystem(), dir, force, true, true, true);
  }
  
  public SegmentWriter(NutchFileSystem nfs, File dir, boolean force) throws 
Exception {
    this(nfs, dir, force, true, true, true);
  }
  
  /**
   * Open a segment for writing. When a segment is open, its data files are 
created.
   * 
   * @param nfs NutchFileSystem to use
   * @param dir directory to contain the segment data
   * @param force if true, and segment directory already exists and its content
   *        is in the way, sliently overwrite that content as needed.
   *        If false and the above condition arises, throw an Exception. Note: 
this
   *        doesn't result in an Exception, if force=false, and the target 
directory
   *        already exists, but contains other data not conflicting with the 
segment
   *        data. 
   * @param withContent if true, write Content, otherwise ignore it
   * @param withParseText if true, write ParseText, otherwise ignore it
   * @param withParseData if true, write ParseData, otherwise ignore it
   * @throws Exception
   */
  public SegmentWriter(NutchFileSystem nfs, File dir, boolean force,
          boolean withContent, boolean withParseText, boolean withParseData) 
throws Exception {
    segmentDir = dir;
    if (!nfs.exists(segmentDir)) {
      nfs.mkdirs(segmentDir);
    }
    File out = new File(segmentDir, FetcherOutput.DIR_NAME);
    if (nfs.exists(out) && !force) {
      throw new Exception("Output directory " + out + " already exists.");
    }
    fetcherWriter = new ArrayFile.Writer(nfs, out.toString(), 
FetcherOutput.class);
    if (withContent) {
      out = new File(dir, Content.DIR_NAME);
      if (nfs.exists(out) && !force) {
        throw new Exception("Output directory " + out + " already exists.");
      }
      contentWriter = new ArrayFile.Writer(nfs, out.toString(), Content.class);
    }
    if (withParseText) {
      out = new File(dir, ParseText.DIR_NAME);
      if (nfs.exists(out) && !force) {
        throw new Exception("Output directory " + out + " already exists.");
      }
      parseTextWriter = new ArrayFile.Writer(nfs, out.toString(), 
ParseText.class);
    }
    if (withParseData) {
      out = new File(dir, ParseData.DIR_NAME);
      if (nfs.exists(out) && !force) {
        throw new Exception("Output directory " + out + " already exists.");
      }
      parseDataWriter = new ArrayFile.Writer(nfs, out.toString(), 
ParseData.class);
    }
  }

  /** Create a new segment name */
  public static String getNewSegmentName() {
    return new SimpleDateFormat("yyyyMMddHHmmss").format(new 
Date(System.currentTimeMillis()));
  }

  /** Sets the index interval for all segment writers. */
  public synchronized void setIndexInterval(int interval) throws IOException {
    fetcherWriter.setIndexInterval(interval);
    if (contentWriter != null) contentWriter.setIndexInterval(interval);
    if (parseTextWriter != null) parseTextWriter.setIndexInterval(interval);
    if (parseDataWriter != null) parseDataWriter.setIndexInterval(interval);
  }

  public synchronized void append(FetcherOutput fo, Content co, ParseText pt, 
ParseData pd) throws IOException {
    fetcherWriter.append(fo);
    if (contentWriter != null) contentWriter.append(co);
    if (parseTextWriter != null) parseTextWriter.append(pt);
    if (parseDataWriter != null) parseDataWriter.append(pd);
    size++;
  }
  
  /** Close all writers. */
  public void close() {
    try {
      fetcherWriter.close();
    } catch (Exception e) {
      LOG.fine("Exception closing fetcherWriter: " + e.getMessage());
    }
    if (contentWriter != null) try {
      contentWriter.close();
    } catch (Exception e) {
      LOG.fine("Exception closing contentWriter: " + e.getMessage());
    }
    if (parseTextWriter != null) try {
      parseTextWriter.close();
    } catch (Exception e) {
      LOG.fine("Exception closing parseTextWriter: " + e.getMessage());
    }
    if (parseDataWriter != null) try {
      parseDataWriter.close();
    } catch (Exception e) {
      LOG.fine("Exception closing parseDataWriter: " + e.getMessage());
    }
  }

  public static void main(String[] args) {}
}

--- NEW FILE: SegmentReader.java ---
/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.segment;

import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Vector;
import java.util.logging.Logger;

import net.nutch.fetcher.FetcherOutput;
import net.nutch.io.ArrayFile;
import net.nutch.io.LongWritable;
import net.nutch.io.MapFile;
import net.nutch.io.SequenceFile;
import net.nutch.io.UTF8;
import net.nutch.pagedb.FetchListEntry;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.util.LocalFileSystem;
import net.nutch.util.LogFormatter;
import net.nutch.util.NutchFileSystem;

/**
 * This class holds together all data readers for an existing segment.
 * Some convenience methods are also provided, to read from the segment and
 * to reposition the current pointer.
 * 
 * @author Andrzej Bialecki &lt;[EMAIL PROTECTED]&gt;
 */
public class SegmentReader {
  public static final Logger LOG = 
LogFormatter.getLogger("net.nutch.segment.SegmentReader");
  
  public ArrayFile.Reader fetcherReader;
  public ArrayFile.Reader contentReader;
  public ArrayFile.Reader parseTextReader;
  public ArrayFile.Reader parseDataReader;

  /**
   * The time when fetching of this segment started, as recorded
   * in fetcher output data.
   */
  public long started = 0L;
  /**
   * The time when fetching of this segment finished, as recorded
   * in fetcher output data.
   */
  public long finished = 0L;
  public long size = 0L;
  private long key = -1L;

  
  public File segmentDir;
  public NutchFileSystem nfs;

  /**
   * Open a segment for reading. If segment is corrupted, do not attempt to fix 
it.
   * @param dir directory containing segment data
   * @throws Exception
   */
  public SegmentReader(File dir) throws Exception {
    this(new LocalFileSystem(), dir, true, true, true, false);
  }
  
  /**
   * Open a segment for reading. If segment is corrupted, do not attempt to fix 
it.
   * @param nfs filesystem
   * @param dir directory containing segment data
   * @throws Exception
   */
  public SegmentReader(NutchFileSystem nfs, File dir) throws Exception {
    this(nfs, dir, true, true, true, false);
  }
  
  /**
   * Open a segment for reading.
   * @param dir directory containing segment data
   * @param autoFix if true, and the segment is corrupted, attempt to 
   * fix errors and try to open it again. If the segment is corrupted, and
   * autoFix is false, or it was not possible to correct errors, an Exception is
   * thrown.
   * @throws Exception
   */
  public SegmentReader(File dir, boolean autoFix) throws Exception {
    this(new LocalFileSystem(), dir, true, true, true, autoFix);
  }
  
  /**
   * Open a segment for reading.
   * @param nfs filesystem
   * @param dir directory containing segment data
   * @param autoFix if true, and the segment is corrupted, attempt to 
   * fix errors and try to open it again. If the segment is corrupted, and
   * autoFix is false, or it was not possible to correct errors, an Exception is
   * thrown.
   * @throws Exception
   */
  public SegmentReader(NutchFileSystem nfs, File dir, boolean autoFix) throws 
Exception {
    this(nfs, dir, true, true, true, autoFix);
  }
  
  /**
   * Open a segment for reading. When a segment is open, its total size is 
checked
   * and cached in this class - however, only by actually reading entries one 
can
   * be sure about the exact number of valid, non-corrupt entries.
   * 
   * @param nfs NutchFileSystem to use
   * @param dir directory containing segment data
   * @param withContent if true, read Content, otherwise ignore it
   * @param withParseText if true, read ParseText, otherwise ignore it
   * @param withParseData if true, read ParseData, otherwise ignore it
   * @param autoFix if true, and the segment is corrupt, try to automatically 
fix it.
   * If this parameter is false, and the segment is corrupt, or fixing was 
unsuccessful,
   * and Exception is thrown.
   * @throws Exception
   */
  public SegmentReader(NutchFileSystem nfs, File dir,
          boolean withContent, boolean withParseText, boolean withParseData,
          boolean autoFix) throws Exception {
    try {
      init(nfs, dir, withContent, withParseText, withParseData);
    } catch (Exception e) {
      boolean ok = false;
      if (autoFix) {
        // corrupt segment, attempt to fix
        ok = fixSegment(nfs, dir, withContent, withParseText, withParseData, 
false);
      }
      if (ok)
        init(nfs, dir, withContent, withParseText, withParseData);
      else throw new Exception("Segment " + dir + " is corrupted.");
    }
  }

  /**
   * Attempt to fix a partially corrupted segment. Currently this means just
   * fixing broken MapFile's, using [EMAIL PROTECTED] 
MapFile#fix(NutchFileSystem, File, Class, Class, boolean)}
   * method.
   * @param nfs filesystem
   * @param dir segment directory
   * @param withContent if true, fix content, otherwise ignore it
   * @param withParseText if true, fix parse_text, otherwise ignore it
   * @param withParseData if true, fix parse_data, otherwise ignore it
   * @param dryrun if true, only show what would be done without performing any 
actions
   * @return
   */
  public static boolean fixSegment(NutchFileSystem nfs, File dir, 
          boolean withContent, boolean withParseText, boolean withParseData,
          boolean dryrun) {
    String dr = "";
    if (dryrun) dr = "[DRY RUN] ";
    File content = new File(dir, Content.DIR_NAME);
    File fetcherOutput = new File(dir, FetcherOutput.DIR_NAME);
    File parseData = new File(dir, ParseData.DIR_NAME);
    File parseText = new File(dir, ParseText.DIR_NAME);
    long cnt = 0L;
    try {
      cnt = MapFile.fix(nfs, fetcherOutput, LongWritable.class, 
FetcherOutput.class, dryrun);
      if (cnt != -1) LOG.info(dr + " - fixed " + fetcherOutput.getName());
      if (withContent) {
        cnt = MapFile.fix(nfs, content, LongWritable.class, Content.class, 
dryrun);
        if (cnt != -1) LOG.info(dr + " - fixed " + content.getName());
      }
      if (withParseData) {
        cnt = MapFile.fix(nfs, parseData, LongWritable.class, ParseData.class, 
dryrun);
        if (cnt != -1) LOG.info(dr + " - fixed " + parseData.getName());
      }
      if (withParseText) {
        cnt = MapFile.fix(nfs, parseText, LongWritable.class, ParseText.class, 
dryrun);
        if (cnt != -1) LOG.info(dr + " - fixed " + parseText.getName());
      }
      LOG.info(dr + "Finished fixing " + dir.getName());
      return true;
    } catch (Throwable t) {
      LOG.warning(dr + "Unable to fix segment " + dir.getName() + ": " + 
t.getMessage());
      return false;
    }
  }

  private void init(NutchFileSystem nfs, File dir,
          boolean withContent, boolean withParseText, boolean withParseData) 
throws Exception {
    segmentDir = dir;
    this.nfs = nfs;
    fetcherReader = new ArrayFile.Reader(nfs, new File(dir, 
FetcherOutput.DIR_NAME).toString());
    if (withContent) contentReader = new ArrayFile.Reader(nfs, new File(dir, 
Content.DIR_NAME).toString());
    if (withParseText) parseTextReader = new ArrayFile.Reader(nfs, new 
File(dir, ParseText.DIR_NAME).toString());
    if (withParseData) parseDataReader = new ArrayFile.Reader(nfs, new 
File(dir, ParseData.DIR_NAME).toString());
    // count the number of valid entries.
    // XXX We assume that all other data files contain the
    // XXX same number of valid entries - which is not always
    // XXX true if Fetcher crashed in the middle of update.
    // XXX One should check for this later, when actually
    // XXX reading the entries.
    FetcherOutput fo = new FetcherOutput();
    fetcherReader.next(fo);
    started = fo.getFetchDate();
    LongWritable w = new LongWritable();
    w.set(++size);
    try {
      while (fetcherReader.seek(w)) {
        w.set(++size);
      }
    } catch (EOFException eof) {
      // the file is truncated - probably due to a crashed fetcher.
      // Use just the part that we can...
      LOG.warning(" - data in segment " + dir + " is corrupt, using only " + 
size + " entries.");
    }
    fetcherReader.seek(size - 2);
    fetcherReader.next(fo);
    finished = fo.getFetchDate();
    // reposition to the start
    fetcherReader.reset();
  }

  /**
   * Get a specified entry from the segment. Note: even if some of the storage 
objects
   * are null, but if respective readers are open a seek(n) operation will be 
performed
   * anyway, to ensure that the whole entry is valid.
   * 
   * @param n position of the entry
   * @param fo storage for FetcherOutput data. Must not be null.
   * @param co storage for Content data, or null.
   * @param pt storage for ParseText data, or null.
   * @param pd storage for ParseData data, or null.
   * @return true if all requested data successfuly read, false otherwise
   * @throws IOException
   */
  public synchronized boolean get(long n, FetcherOutput fo, Content co,
          ParseText pt, ParseData pd) throws IOException {
    //XXX a trivial implementation would be to do the following:
    //XXX   seek(n);
    //XXX   return next(fo, co, pt, pd);
    //XXX However, get(long, Writable) may be more optimized
    boolean valid = true;
    if (fetcherReader.get(n, fo) == null) valid = false;
    if (contentReader != null) {
      if (co != null) {
        if (contentReader.get(n, co) == null) valid = false;
      } else contentReader.seek(n);
    }
    if (parseTextReader != null) {
      if (pt != null) {
        if (parseTextReader.get(n, pt) == null) valid = false;
      } else parseTextReader.seek(n);
    }
    if (parseDataReader != null) {
      if (pd != null) {
        if (parseDataReader.get(n, pd) == null) valid = false;
      } else parseDataReader.seek(n);
    }
    key = n;
    return valid;
  }
  
  private Content _co = new Content();
  private ParseText _pt = new ParseText();
  private ParseData _pd = new ParseData();
  
  /** Read values from all open readers. */
  public synchronized boolean next(FetcherOutput fo, Content co,
          ParseText pt, ParseData pd) throws IOException {
    boolean valid = true;
    Content rco = (co == null) ? _co : co;
    ParseText rpt = (pt == null) ? _pt : pt;
    ParseData rpd = (pd == null) ? _pd : pd;
    if (fetcherReader.next(fo) == null) valid = false;
    if (contentReader != null)
      if (contentReader.next(rco) == null) valid = false;
    if (parseTextReader != null)
      if (parseTextReader.next(rpt) == null) valid = false;
    if (parseDataReader != null)
      if (parseDataReader.next(rpd) == null) valid = false;
    key++;
    return valid;
  }
  
  /** Seek to a position in all readers. */
  public synchronized void seek(long n) throws IOException {
    fetcherReader.seek(n);
    if (contentReader != null) contentReader.seek(n);
    if (parseTextReader != null) parseTextReader.seek(n);
    if (parseDataReader != null) parseDataReader.seek(n);
    key = n;
  }

  /** Return the current key position. */
  public long key() {
    return key;
  }

  /** Reset all readers. */
  public synchronized void reset() throws IOException {
    fetcherReader.reset();
    if (contentReader != null) contentReader.reset();
    if (parseTextReader != null) parseTextReader.reset();
    if (parseDataReader != null) parseDataReader.reset();
  }

  /** Close all readers. */
  public synchronized void close() {
    try {
      fetcherReader.close();
    } catch (Exception e) {};
    if (contentReader != null) try {
      contentReader.close();
    } catch (Exception e) {};
    if (parseTextReader != null) try {
      parseTextReader.close();
    } catch (Exception e) {};
    if (parseDataReader != null) try {
      parseDataReader.close();
    } catch (Exception e) {};
  }
  
  /**
   * Dump the segment's content in human-readable format.
   * @param sorted if true, sort segment entries by URL (ascending). If false,
   * output entries in the order they occur in the segment.
   * @param output where to dump to
   * @throws Exception
   */
  public synchronized void dump(boolean sorted, PrintStream output) throws 
Exception {
    reset();
    FetcherOutput fo = new FetcherOutput();
    Content co = new Content();
    ParseData pd = new ParseData();
    ParseText pt = new ParseText();
    long recNo = 0L;
    if (!sorted) {
      while(next(fo, co, pt, pd)) {
        output.println("Recno:: " + recNo++);
        output.println("FetcherOutput::\n" + fo.toString());
        output.println("Content::\n" + co.toString());
        output.println("ParseData::\n"; + pd.toString());
        output.println("ParseText::\n" + pt.toString() + "\n");
      }
    } else {
      File unsortedFile = new File(segmentDir, ".unsorted");
      File sortedFile = new File(segmentDir, ".sorted");
      nfs.delete(unsortedFile);
      nfs.delete(sortedFile);
      SequenceFile.Writer seqWriter = new SequenceFile.Writer(nfs,
              unsortedFile.toString(), UTF8.class, LongWritable.class);
      FetchListEntry fle;
      LongWritable rec = new LongWritable();
      UTF8 url = new UTF8();
      String urlString;
      while (fetcherReader.next(fo) != null) {
        fle = fo.getFetchListEntry();
        urlString = fle.getPage().getURL().toString();
        rec.set(recNo);
        url.set(urlString);
        seqWriter.append(url, rec);
        recNo++;
      }
      seqWriter.close();
      // sort the SequenceFile
      long start = System.currentTimeMillis();

      SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs,
              new UTF8.Comparator(), LongWritable.class);

      sorter.sort(unsortedFile.toString(), sortedFile.toString());

      float localSecs = (System.currentTimeMillis() - start) / 1000.0f;
      LOG.info(" - sorted: " + recNo + " entries in " + localSecs + "s, "
        + (recNo/localSecs) + " entries/s");

      nfs.delete(unsortedFile);
      SequenceFile.Reader seqReader = new SequenceFile.Reader(nfs, 
sortedFile.toString());
      while (seqReader.next(url, rec)) {
        recNo = rec.get();
        get(recNo, fo, co, pt, pd);
        output.println("Recno:: " + recNo++);
        output.println("FetcherOutput::\n" + fo.toString());
        output.println("Content::\n" + co.toString());
        output.println("ParseData::\n"; + pd.toString());
        output.println("ParseText::\n" + pt.toString() + "\n");
      }
      seqReader.close();
      nfs.delete(sortedFile);
    }
  }

  /** Command-line wrapper. Run without arguments to see usage help. */
  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      usage();
      return;
    }
    SegmentReader reader = null;
    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
    String segDir = null;
    Vector dirs = new Vector();
    boolean fix = false;
    boolean list = false;
    boolean dump = false;
    boolean sorted = false;
    boolean withParseText = true;
    boolean withParseData = true;
    boolean withContent = true;
    for (int i = 0; i < args.length; i++) {
      if (args[i] != null) {
        if (args[i].equals("-noparsetext")) withParseText = false;
        else if (args[i].equals("-noparsedata")) withParseData = false;
        else if (args[i].equals("-nocontent")) withContent = false;
        else if (args[i].equals("-fix")) fix = true;
        else if (args[i].equals("-dump")) dump = true;
        else if (args[i].equals("-dumpsort")) {
          dump = true;
          sorted = true;
        } else if (args[i].equals("-list")) list = true;
        else if (args[i].equals("-dir")) segDir = args[++i];
        else dirs.add(new File(args[i]));
      }
    }
    if (segDir != null) {
      File sDir = new File(segDir);
      if (!sDir.exists() || !sDir.isDirectory()) {
        LOG.warning("Invalid path: " + sDir);
      } else {
        File[] files = sDir.listFiles(new FileFilter() {
          public boolean accept(File f) {
            return f.isDirectory();
          }
        });
        if (files != null && files.length > 0) {
          for (int i = 0; i < files.length; i++) dirs.add(files[i]);
        }
      }
    }
    if (dirs.size() == 0) {
      LOG.severe("No input segment dirs.");
      usage();
      return;
    }
    long total = 0L;
    int cnt = 0;
    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd'-'HH:mm:ss");
    DecimalFormat df = new DecimalFormat("########");
    df.setParseIntegerOnly(true);
    if (list)
      LOG.info(" STARTED\t\t\tFINISHED\t\tCOUNT\tDIR NAME");
    for (int i = 0; i < dirs.size(); i++) {
      File dir = (File)dirs.get(i);
      try {
        reader = new SegmentReader(nfs, dir,
              withContent, withParseText, withParseData, fix);
        if (list) {
          LOG.info(" " + sdf.format(new Date(reader.started)) +
                  "\t" + sdf.format(new Date(reader.finished)) +
                  "\t" + df.format(reader.size) +
                  "\t" + dir.getName());
        }
        total += reader.size;
        cnt++;
        if (dump) reader.dump(sorted, System.out);
      } catch (Throwable t) {
        LOG.warning(t.getMessage());
      }
    }
    if (list)
      LOG.info("TOTAL: " + total + " entries in " + cnt + " segments.");
  }
  
  private static void usage() {
    System.err.println("SegmentReader [-fix] [-dump] [-dumpsort] [-list] 
[-nocontent] [-noparsedata] [-noparsetext] (-dir segments | seg1 seg2 ...)");
    System.err.println("\tNOTE: at least one segment dir name is required, or 
'-dir' option.");
    System.err.println("\t-fix\t\tautomatically fix corrupted segments");
    System.err.println("\t-dump\t\tdump segment data in human-readable format");
    System.err.println("\t-dumpsort\tdump segment data in human-readable 
format, sorted by URL");
    System.err.println("\t-list\t\tprint useful information about segments");
    System.err.println("\t-nocontent\tignore content data");
    System.err.println("\t-noparsedata\tignore parse_data data");
    System.err.println("\t-nocontent\tignore parse_text data");
    System.err.println("\t-dir segments\tdirectory containing multiple 
segments");
    System.err.println("\tseg1 seg2 ...\tsegment directories\n");
  }
}



-------------------------------------------------------
This SF.Net email is sponsored by:
Sybase ASE Linux Express Edition - download now for FREE
LinuxWorld Reader's Choice Award Winner for best database on Linux.
http://ads.osdn.com/?ad_id=5588&alloc_id=12065&op=click
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to