Andrzej,
I hadn't had time to look at this closely until now. I like it very much, and find it a little suprising. You use Lucene as a file
sorter / b-tree-like package, rather than using SequenceFile.Sorter
and MapFile. Lucene is as efficient, yet is a simpler API.
I attached the latest version, where some parts of the process are
probably more efficient... Still, the final results are inconclusive, that's why I'm reluctant to commit it - it appears that the speed difference becomes not so dramatic as the size and the number of the input segments grow...
I never would have guessed that Lucene would make such a good general-purpose database engine, given that it is only really
designed to handle document search!
Well, thanks to its excellent design, I might add... ;-)
-- Best regards, Andrzej Bialecki
------------------------------------------------- Software Architect, System Integration Specialist CEN/ISSS EC Workshop, ECIMF project chair EU FP6 E-Commerce Expert/Evaluator ------------------------------------------------- FreeBSD developer (http://www.freebsd.org)
/* Copyright (c) 2004 The Nutch Organization. All rights reserved. */ /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.tools;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.nutch.fetcher.FetcherOutput;
import net.nutch.indexer.DeleteDuplicates;
import net.nutch.indexer.IndexMerger;
import net.nutch.indexer.IndexSegment;
import net.nutch.io.ArrayFile;
import net.nutch.io.Writable;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.util.*;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.DateField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermDocs;
import org.apache.lucene.index.TermEnum;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Hits;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
/**
* This class cleans up accumulated segments data, and merges them into a single
* segment, with no duplicates in it.
*
* <p>
* Contrary to the SegmentMergeTool there are no prerequisites for its correct
* operation except for a set of already fetched segments. This tool does not
* use DeleteDuplicates, but creates its own "master" index of all pages in all
* segments. Then it walks sequentially through this index and picks up only
* single pages for every unique value of url or hash.
* </p>
* <p>
* The newly created segment is then optionally indexed, so that it can be
* either merged with more new segments, or used for searching as it is.
* </p>
* <p>
* Old segments may be optionally removed, because all needed data has already
* been copied to the new merged segment.
* </p>
* <p>
* You may directly run FastSegmentMergeTool, with all options turned on, i.e.
* to merge segments into the output segment, index it, and then delete the
* original segments data.
* </p>
*
* @author Andrzej Bialecki <[EMAIL PROTECTED]>
*/
public class FastSegmentMergeTool {
public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.FastSegmentMergeTool");
private String segments = null;
private String output = null;
private List segdirs = null;
private List allsegdirs = null;
private boolean runIndexer = false;
private boolean delSegs = false;
// This class holds together all data readers for an existing segment
static class SegmentReader {
public ArrayFile.Reader fetcherReader;
public ArrayFile.Reader contentReader;
public ArrayFile.Reader parseTextReader;
public ArrayFile.Reader parseDataReader;
public long size = 0L;
public File dir = null;
public SegmentReader(File dir) throws Exception {
this.dir = dir;
fetcherReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, FetcherOutput.DIR_NAME).toString());
contentReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, Content.DIR_NAME).toString());
parseTextReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseText.DIR_NAME).toString());
parseDataReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseData.DIR_NAME).toString());
// count the number of valid entries.
// XXX We assume that all other data files contain the
// XXX same number of valid entries - which is not always
// XXX true if Fetcher crashed in the middle of update.
// XXX We compensate for this later, when actually
// XXX reading the entries.
Writable w = new FetcherOutput();
try {
while (fetcherReader.next(w) != null)
size++;
} catch (EOFException eof) {
// the file is truncated - probably due to a crashed fetcher.
// Use just the part that we can...
LOG.warning(" - segment " + dir + " is corrupt, using only " + size + " entries.");
}
// reposition to the start
fetcherReader.reset();
}
// Close all readers
public void close() {
try {
fetcherReader.close();
} catch (Exception e) {}
;
try {
contentReader.close();
} catch (Exception e) {}
;
try {
parseTextReader.close();
} catch (Exception e) {}
;
try {
parseDataReader.close();
} catch (Exception e) {}
;
}
public void reset() throws IOException {
fetcherReader.reset();
contentReader.reset();
parseTextReader.reset();
parseDataReader.reset();
}
}
private HashMap readers = new HashMap();
// writers for the output segment
private ArrayFile.Writer fetcherWriter;
private ArrayFile.Writer contentWriter;
private ArrayFile.Writer parseTextWriter;
private ArrayFile.Writer parseDataWriter;
public FastSegmentMergeTool(String segments, String output, boolean runIndexer, boolean delSegs) throws Exception {
this.segments = segments;
this.runIndexer = runIndexer;
this.delSegs = delSegs;
File segs = new File(segments);
if (!segs.exists() || !segs.isDirectory()) throw new Exception("Not a segments dir: " + segs);
File[] dirs = segs.listFiles(new FileFilter() {
public boolean accept(File file) {
if (file.isDirectory()) return true;
return false;
}
});
allsegdirs = Arrays.asList(dirs);
this.output = output;
}
// Create a new segment name
private String getSegmentName() {
return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
}
public void run() {
long start = System.currentTimeMillis();
try {
segdirs = new ArrayList();
// open all segments
long total = 0L;
for (int i = 0; i < allsegdirs.size(); i++) {
File dir = (File) allsegdirs.get(i);
SegmentReader sr = null;
try {
sr = new SegmentReader(dir);
} catch (Exception e) {
// this segment is hosed, don't use it
LOG.warning(" - segment " + dir + " is corrupt, skipping all entries.");
continue;
}
segdirs.add(dir);
total += sr.size;
LOG.info("Segment " + dir.getName() + ": " + sr.size + " entries.");
readers.put(dir.getName(), sr);
}
LOG.info("TOTAL " + total + " input entries in " + segdirs.size() + " segments.");
File masterDir = new File(new File(segments), ".fastmerge_index");
if (!masterDir.mkdirs()) {
LOG.severe("Could not create a master index dir: " + masterDir);
return;
}
LOG.info("Creating master index...");
IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
iw.setUseCompoundFile(false);
iw.mergeFactor = 30;
iw.minMergeDocs = 1000;
long s1 = System.currentTimeMillis();
Iterator it = readers.values().iterator();
long cnt = 0L;
while (it.hasNext()) {
SegmentReader sr = (SegmentReader) it.next();
// read all parts - to make sure all data in each entry is valid
FetcherOutput fo = new FetcherOutput();
//Content co = new Content();
//ParseText pt = new ParseText();
//ParseData pd = new ParseData();
for (long i = 0; i < sr.size; i++) {
try {
sr.fetcherReader.get(i, fo);
// make sure the entry is valid, i.e. there is no truncated content
sr.contentReader.seek(i);
sr.parseTextReader.seek(i);
sr.parseDataReader.seek(i);
Document doc = new Document();
doc.add(new Field("seg_doc", sr.dir.getName()+ "|" + i, true, false, false));
doc.add(new Field("url", fo.getUrl().toString(), true, true, false));
doc.add(new Field("hash", fo.getMD5Hash().toString(), true, true, false));
doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
iw.addDocument(doc);
cnt++;
if (cnt > 0 && (cnt % 10000 == 0)) LOG.info("Processed " + cnt + " entries.");
} catch (Throwable t) {
// we can assume the data is invalid from now on - break here
LOG.info(" - segment " + sr.dir.getName() + " truncated to " + (i + 1) + " entries (" +
t.getMessage() + ")");
i = sr.size;
}
}
}
System.out.println("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
s1 = System.currentTimeMillis();
iw.optimize();
iw.close();
System.out.println("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
IndexReader ir = IndexReader.open(masterDir);
int outputCnt = 0, i = 0;
s1 = System.currentTimeMillis();
LOG.info("Removing duplicate entries...");
TermEnum te = ir.terms();
while(te.next()) {
Term t = te.term();
if (t == null) continue;
if (!(t.field().equals("hash") || t.field().equals("url"))) continue;
// get only the latest document, and delete all others - because they
// have the same URL or hash
TermDocs td = ir.termDocs(t);
if (td == null) continue;
int id = -1;
String time = null;
Document doc = null;
//System.out.println("-------------");
while (td.next()) {
int docid = td.doc();
if (!ir.isDeleted(docid)) {
doc = ir.document(docid);
if (time == null) {
time = doc.get("time");
id = docid;
//System.out.println(" - set " + time);
continue;
}
String dtime = doc.get("time");
if (dtime.compareTo(time) > 0) {
if (id != -1) {
//System.out.println(" - del " + time + ", set " + dtime);
ir.delete(id);
}
time = dtime;
id = docid;
} else {
//System.out.println(" - del " + dtime);
ir.delete(docid);
}
}
}
}
//ir.close();
//iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), false);
//iw.optimize();
System.out.println("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
//ir = IndexReader.open(masterDir);
File directory = new File(output);
if (directory.exists() && !directory.isDirectory())
throw new Exception("Output dir is not a directory: " + directory);
if (!directory.exists()) directory.mkdirs();
directory = new File(directory, getSegmentName());
LOG.info("Merging all segments into " + directory.getName());
directory.mkdirs();
fetcherWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, FetcherOutput.DIR_NAME)
.toString(), FetcherOutput.class);
contentWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, Content.DIR_NAME).toString(),
Content.class);
parseTextWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, ParseText.DIR_NAME).toString(),
ParseText.class);
parseDataWriter = new ArrayFile.Writer(new LocalFileSystem(), new File(directory, ParseData.DIR_NAME).toString(),
ParseData.class);
FetcherOutput fo = new FetcherOutput();
Content co = new Content();
ParseText pt = new ParseText();
ParseData pd = new ParseData();
for (int n = 0; n < ir.maxDoc(); n++) {
if (ir.isDeleted(n)) {
//System.out.println("-del");
continue;
}
Document doc = ir.document(n);
String segDoc = doc.get("seg_doc");
int idx = segDoc.indexOf('|');
String segName = segDoc.substring(0, idx);
String docName = segDoc.substring(idx + 1);
SegmentReader sr = (SegmentReader) readers.get(segName);
long docid;
try {
docid = Long.parseLong(docName);
} catch (Exception e) {
continue;
}
i++;
if (i > 0 && (i % 10000 == 0)) LOG.info("Merged " + i + " entries.");
try {
// get data from the reader
sr.fetcherReader.get(docid, fo);
sr.contentReader.get(docid, co);
sr.parseTextReader.get(docid, pt);
sr.parseDataReader.get(docid, pd);
} catch (Throwable thr) {
// don't break the loop, because only one of the segments
// may be corrupted...
LOG.fine(" - corrupt entry no. " + docid + " in segment " + sr.dir.getName() + " - skipping.");
continue;
}
// write it back
fetcherWriter.append(fo);
contentWriter.append(co);
parseTextWriter.append(pt);
parseDataWriter.append(pd);
outputCnt++;
}
System.out.println("* merging took " + (System.currentTimeMillis() - s1) + " ms");
ir.close();
fetcherWriter.close();
contentWriter.close();
parseTextWriter.close();
parseDataWriter.close();
FileUtil.fullyDelete(masterDir);
for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) {
SegmentReader sr = (SegmentReader) readers.get(iter.next());
sr.close();
}
if (runIndexer) {
LOG.info("Creating new segment index...");
IndexSegment.main(new String[] { directory.toString() });
}
if (delSegs) {
// This deletes also all corrupt segments, which are
// unusable anyway
LOG.info("Deleting old segments...");
for (int k = 0; i < allsegdirs.size(); k++) {
FileUtil.fullyDelete((File) allsegdirs.get(k));
}
}
long delta = System.currentTimeMillis() - start;
float eps = (float) total / (float) (delta / 1000);
LOG.info("DONE segment merging, INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
+ ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
} catch (Exception e) {
e.printStackTrace();
LOG.severe(e.getMessage());
}
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Too few arguments.\n");
usage();
System.exit(-1);
}
boolean runIndexer = false;
boolean delSegs = false;
String output = null;
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-o")) {
if (args.length > i + 1) {
output = args[++i];
continue;
} else {
System.err.println("Required value of '-o' argument missing.\n");
usage();
System.exit(-1);
}
} else if (args[i].equals("-i"))
runIndexer = true;
else if (args[i].equals("-ds")) delSegs = true;
}
if (output == null) output = args[0];
FastSegmentMergeTool st = new FastSegmentMergeTool(args[0], output, runIndexer, delSegs);
st.run();
}
private static void usage() {
System.err.println("FastSegmentMergeTool <input_segments_dir> [-o <output_segment_dir>] [-i] [-ds]");
System.err.println("\t<input_segments_dir>\tpath to directory containing\n\t\t\t\tall input segments");
System.err
.println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain a single output segment.\n\t\t\tNOTE: If not present, the original segments path will be used.");
System.err.println("\t-i\t\t(optional) index the output segment.");
System.err.println("\t-ds\t\t(optional) delete the original segments when finished.");
System.err.println();
}
}
