Doug Cutting wrote:
Andrzej,

I hadn't had time to look at this closely until now. I like it very much, and find it a little suprising. You use Lucene as a file
sorter / b-tree-like package, rather than using SequenceFile.Sorter
and MapFile. Lucene is as efficient, yet is a simpler API.

I attached the latest version, where some parts of the process are
probably more efficient... Still, the final results are inconclusive, that's why I'm reluctant to commit it - it appears that the speed difference becomes not so dramatic as the size and the number of the input segments grow...


I never would have guessed that Lucene would make such a good general-purpose database engine, given that it is only really
designed to handle document search!

Well, thanks to its excellent design, I might add... ;-)

--
Best regards,
Andrzej Bialecki

-------------------------------------------------
Software Architect, System Integration Specialist
CEN/ISSS EC Workshop, ECIMF project chair
EU FP6 E-Commerce Expert/Evaluator
-------------------------------------------------
FreeBSD developer (http://www.freebsd.org)

/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.tools;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.nutch.fetcher.FetcherOutput;
import net.nutch.indexer.DeleteDuplicates;
import net.nutch.indexer.IndexMerger;
import net.nutch.indexer.IndexSegment;
import net.nutch.io.ArrayFile;
import net.nutch.io.Writable;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.util.*;

import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.DateField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermDocs;
import org.apache.lucene.index.TermEnum;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Hits;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;

/**
 * This class cleans up accumulated segments data, and merges them into a single
 * segment, with no duplicates in it.
 * 
 * <p>
 * Contrary to the SegmentMergeTool there are no prerequisites for its correct
 * operation except for a set of already fetched segments. This tool does not
 * use DeleteDuplicates, but creates its own "master" index of all pages in all
 * segments. Then it walks sequentially through this index and picks up only
 * single pages for every unique value of url or hash.
 * </p>
 * <p>
 * The newly created segment is then optionally indexed, so that it can be
 * either merged with more new segments, or used for searching as it is.
 * </p>
 * <p>
 * Old segments may be optionally removed, because all needed data has already
 * been copied to the new merged segment.
 * </p>
 * <p>
 * You may directly run FastSegmentMergeTool, with all options turned on, i.e.
 * to merge segments into the output segment, index it, and then delete the
 * original segments data.
 * </p>
 * 
 * @author Andrzej Bialecki <[EMAIL PROTECTED]>
 */
public class FastSegmentMergeTool {

  public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.FastSegmentMergeTool");

  private String segments = null;

  private String output = null;

  private List segdirs = null;

  private List allsegdirs = null;

  private boolean runIndexer = false;

  private boolean delSegs = false;

  // This class holds together all data readers for an existing segment
  static class SegmentReader {
    public ArrayFile.Reader fetcherReader;

    public ArrayFile.Reader contentReader;

    public ArrayFile.Reader parseTextReader;

    public ArrayFile.Reader parseDataReader;

    public long size = 0L;

    public File dir = null;

    public SegmentReader(File dir) throws Exception {
      this.dir = dir;
      fetcherReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, FetcherOutput.DIR_NAME).toString());
      contentReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, Content.DIR_NAME).toString());
      parseTextReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseText.DIR_NAME).toString());
      parseDataReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseData.DIR_NAME).toString());
      // count the number of valid entries.
      // XXX We assume that all other data files contain the
      // XXX same number of valid entries - which is not always
      // XXX true if Fetcher crashed in the middle of update.
      // XXX We compensate for this later, when actually
      // XXX reading the entries.
      Writable w = new FetcherOutput();
      try {
        while (fetcherReader.next(w) != null)
          size++;
      } catch (EOFException eof) {
        // the file is truncated - probably due to a crashed fetcher.
        // Use just the part that we can...
        LOG.warning(" - segment " + dir + " is corrupt, using only " + size + " entries.");
      }
      // reposition to the start
      fetcherReader.reset();
    }

    // Close all readers
    public void close() {
      try {
        fetcherReader.close();
      } catch (Exception e) {}
      ;
      try {
        contentReader.close();
      } catch (Exception e) {}
      ;
      try {
        parseTextReader.close();
      } catch (Exception e) {}
      ;
      try {
        parseDataReader.close();
      } catch (Exception e) {}
      ;
    }

    public void reset() throws IOException {
      fetcherReader.reset();
      contentReader.reset();
      parseTextReader.reset();
      parseDataReader.reset();
    }
  }

  private HashMap readers = new HashMap();

  // writers for the output segment
  private ArrayFile.Writer fetcherWriter;

  private ArrayFile.Writer contentWriter;

  private ArrayFile.Writer parseTextWriter;

  private ArrayFile.Writer parseDataWriter;

  public FastSegmentMergeTool(String segments, String output, boolean runIndexer, boolean delSegs) throws Exception {
    this.segments = segments;
    this.runIndexer = runIndexer;
    this.delSegs = delSegs;
    File segs = new File(segments);
    if (!segs.exists() || !segs.isDirectory()) throw new Exception("Not a segments dir: " + segs);
    File[] dirs = segs.listFiles(new FileFilter() {
      public boolean accept(File file) {
        if (file.isDirectory()) return true;
        return false;
      }
    });
    allsegdirs = Arrays.asList(dirs);
    this.output = output;
  }

  // Create a new segment name
  private String getSegmentName() {
    return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
  }

  public void run() {
    long start = System.currentTimeMillis();
    try {
      segdirs = new ArrayList();
      // open all segments
      long total = 0L;
      for (int i = 0; i < allsegdirs.size(); i++) {
        File dir = (File) allsegdirs.get(i);
        SegmentReader sr = null;
        try {
          sr = new SegmentReader(dir);
        } catch (Exception e) {
          // this segment is hosed, don't use it
          LOG.warning(" - segment " + dir + " is corrupt, skipping all entries.");
          continue;
        }
        segdirs.add(dir);
        total += sr.size;
        LOG.info("Segment " + dir.getName() + ": " + sr.size + " entries.");
        readers.put(dir.getName(), sr);
      }
      LOG.info("TOTAL " + total + " input entries in " + segdirs.size() + " segments.");
      File masterDir = new File(new File(segments), ".fastmerge_index");
      if (!masterDir.mkdirs()) {
        LOG.severe("Could not create a master index dir: " + masterDir);
        return;
      }
      LOG.info("Creating master index...");
      IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
      iw.setUseCompoundFile(false);
      iw.mergeFactor = 30;
      iw.minMergeDocs = 1000;
      long s1 = System.currentTimeMillis();
      Iterator it = readers.values().iterator();
      long cnt = 0L;
      while (it.hasNext()) {
        SegmentReader sr = (SegmentReader) it.next();
        // read all parts - to make sure all data in each entry is valid
        FetcherOutput fo = new FetcherOutput();
        //Content co = new Content();
        //ParseText pt = new ParseText();
        //ParseData pd = new ParseData();
        for (long i = 0; i < sr.size; i++) {
          try {
            sr.fetcherReader.get(i, fo);
            // make sure the entry is valid, i.e. there is no truncated content
            sr.contentReader.seek(i);
            sr.parseTextReader.seek(i);
            sr.parseDataReader.seek(i);

            Document doc = new Document();
            doc.add(new Field("seg_doc", sr.dir.getName()+ "|" + i, true, false, false));
            doc.add(new Field("url", fo.getUrl().toString(), true, true, false));
            doc.add(new Field("hash", fo.getMD5Hash().toString(), true, true, false));
            doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
            iw.addDocument(doc);
            cnt++;
            if (cnt > 0 && (cnt % 10000 == 0)) LOG.info("Processed " + cnt + " entries.");
          } catch (Throwable t) {
            // we can assume the data is invalid from now on - break here
            LOG.info(" - segment " + sr.dir.getName() + " truncated to " + (i + 1) + " entries (" +
                    t.getMessage() + ")");
            i = sr.size;
          }
        }
      }
      System.out.println("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
      s1 = System.currentTimeMillis();
      iw.optimize();
      iw.close();
      System.out.println("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
      IndexReader ir = IndexReader.open(masterDir);
      int outputCnt = 0, i = 0;
      s1 = System.currentTimeMillis();
      LOG.info("Removing duplicate entries...");
      TermEnum te = ir.terms();
      while(te.next()) {
        Term t = te.term();
        if (t == null) continue;
        if (!(t.field().equals("hash") || t.field().equals("url"))) continue;
        // get only the latest document, and delete all others - because they
        // have the same URL or hash
        TermDocs td = ir.termDocs(t);
        if (td == null) continue;
        int id = -1;
        String time = null;
        Document doc = null;
        //System.out.println("-------------");
        while (td.next()) {
          int docid = td.doc();
          if (!ir.isDeleted(docid)) {
            doc = ir.document(docid);
            if (time == null) {
              time = doc.get("time");
              id = docid;
              //System.out.println(" - set " + time);
              continue;
            }
            String dtime = doc.get("time");
            if (dtime.compareTo(time) > 0) {
              if (id != -1) {
                //System.out.println(" - del " + time + ", set " + dtime);
                ir.delete(id);
              }
              time = dtime;
              id = docid;
            } else {
              //System.out.println(" - del " + dtime);
              ir.delete(docid);
            }
          }
        }
      }
      //ir.close();
      //iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), false);
      //iw.optimize();
      System.out.println("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
      //ir = IndexReader.open(masterDir);
      File directory = new File(output);
      if (directory.exists() && !directory.isDirectory())
              throw new Exception("Output dir is not a directory: " + directory);

      if (!directory.exists()) directory.mkdirs();
      directory = new File(directory, getSegmentName());
      LOG.info("Merging all segments into " + directory.getName());
      directory.mkdirs();
      fetcherWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, FetcherOutput.DIR_NAME)
              .toString(), FetcherOutput.class);
      contentWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, Content.DIR_NAME).toString(),
              Content.class);
      parseTextWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, ParseText.DIR_NAME).toString(),
              ParseText.class);
      parseDataWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, ParseData.DIR_NAME).toString(),
              ParseData.class);

      FetcherOutput fo = new FetcherOutput();
      Content co = new Content();
      ParseText pt = new ParseText();
      ParseData pd = new ParseData();

      for (int n = 0; n < ir.maxDoc(); n++) {
        if (ir.isDeleted(n)) {
          //System.out.println("-del");
          continue;
        }
        Document doc = ir.document(n);
        String segDoc = doc.get("seg_doc");
        int idx = segDoc.indexOf('|');
        String segName = segDoc.substring(0, idx);
        String docName = segDoc.substring(idx + 1);
        SegmentReader sr = (SegmentReader) readers.get(segName);
        long docid;
        try {
          docid = Long.parseLong(docName);
        } catch (Exception e) {
          continue;
        }
        i++;
        if (i > 0 && (i % 10000 == 0)) LOG.info("Merged " + i + " entries.");
        try {
          // get data from the reader
          sr.fetcherReader.get(docid, fo);
          sr.contentReader.get(docid, co);
          sr.parseTextReader.get(docid, pt);
          sr.parseDataReader.get(docid, pd);
        } catch (Throwable thr) {
          // don't break the loop, because only one of the segments
          // may be corrupted...
          LOG.fine(" - corrupt entry no. " + docid + " in segment " + sr.dir.getName() + " - skipping.");
          continue;
        }
        // write it back
        fetcherWriter.append(fo);
        contentWriter.append(co);
        parseTextWriter.append(pt);
        parseDataWriter.append(pd);
        outputCnt++;
      }
      System.out.println("* merging took " + (System.currentTimeMillis() - s1) + " ms");
      ir.close();
      fetcherWriter.close();
      contentWriter.close();
      parseTextWriter.close();
      parseDataWriter.close();
      FileUtil.fullyDelete(masterDir);
      for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) {
        SegmentReader sr = (SegmentReader) readers.get(iter.next());
        sr.close();
      }
      if (runIndexer) {
        LOG.info("Creating new segment index...");
        IndexSegment.main(new String[] { directory.toString() });
      }
      if (delSegs) {
        // This deletes also all corrupt segments, which are
        // unusable anyway
        LOG.info("Deleting old segments...");
        for (int k = 0; i < allsegdirs.size(); k++) {
          FileUtil.fullyDelete((File) allsegdirs.get(k));
        }
      }
      long delta = System.currentTimeMillis() - start;
      float eps = (float) total / (float) (delta / 1000);
      LOG.info("DONE segment merging, INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
              + ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
    } catch (Exception e) {
      e.printStackTrace();
      LOG.severe(e.getMessage());
    }
  }

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Too few arguments.\n");
      usage();
      System.exit(-1);
    }
    boolean runIndexer = false;
    boolean delSegs = false;
    String output = null;
    for (int i = 1; i < args.length; i++) {
      if (args[i].equals("-o")) {
        if (args.length > i + 1) {
          output = args[++i];
          continue;
        } else {
          System.err.println("Required value of '-o' argument missing.\n");
          usage();
          System.exit(-1);
        }
      } else if (args[i].equals("-i"))
        runIndexer = true;
      else if (args[i].equals("-ds")) delSegs = true;
    }
    if (output == null) output = args[0];
    FastSegmentMergeTool st = new FastSegmentMergeTool(args[0], output, runIndexer, delSegs);
    st.run();
  }

  private static void usage() {
    System.err.println("FastSegmentMergeTool <input_segments_dir> [-o <output_segment_dir>] [-i] [-ds]");
    System.err.println("\t<input_segments_dir>\tpath to directory containing\n\t\t\t\tall input segments");
    System.err
            .println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain a single output segment.\n\t\t\tNOTE: If not present, the original segments path will be used.");
    System.err.println("\t-i\t\t(optional) index the output segment.");
    System.err.println("\t-ds\t\t(optional) delete the original segments when finished.");
    System.err.println();
  }
}

Reply via email to