Update of /cvsroot/nutch/nutch/src/java/net/nutch/tools In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2923
Modified Files: SegmentMergeTool.java Log Message: This is a completely rewritten version of the tool. Please see the Javadoc and command-line synopsis for usage. Index: SegmentMergeTool.java =================================================================== RCS file: /cvsroot/nutch/nutch/src/java/net/nutch/tools/SegmentMergeTool.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** SegmentMergeTool.java 6 Oct 2004 22:27:25 -0000 1.6 --- SegmentMergeTool.java 14 Nov 2004 21:31:28 -0000 1.7 *************** *** 4,307 **** package net.nutch.tools; - import java.io.EOFException; import java.io.File; import java.io.FileFilter; - import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; - import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; ! import java.util.ListIterator; ! import java.util.logging.Level; import java.util.logging.Logger; import net.nutch.fetcher.FetcherOutput; - import net.nutch.indexer.DeleteDuplicates; - import net.nutch.indexer.IndexMerger; import net.nutch.indexer.IndexSegment; ! import net.nutch.io.ArrayFile; ! import net.nutch.io.Writable; import net.nutch.parse.ParseData; import net.nutch.parse.ParseText; import net.nutch.protocol.Content; ! import net.nutch.util.*; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; /** ! * This class cleans up accumulated segments data, and merges them ! * into a single segment, with no duplicates in it. It uses a "master" ! * unique index of all documents, which either must already exist ! * (by running IndexSegment for each segment, then DeleteDuplicates, ! * and finally IndexMerger), OR the tool can create it just before ! * merging, including creation of per segment sub-indices as needed. ! * <p>The newly created segment is then optionally indexed, so that ! * it can be either merged with more new segments, or used for ! * searching as it is.</p> ! * <p>The original "master" index can be optionally deleted - ! * since it still points to the old segments the new index should ! * be used instead. Old segments may be optionally removed as well, ! * because all needed data has already been copied to the new merged ! * segment.</p> ! * <p>If you use all provided functionality, you can save ! * some manual steps in Nutch operational procedures. After you've ! * run a couple of cycles of fetchlist generation, fetching, DB ! * updating and analyzing, you end up with several segments, possibly ! * containing duplicates. You may then directly run the ! * SegmentMergerTool, with all options turned on, i.e. to first ! * create the master unique index, merge segments into the output ! * segment, index it, and then delete the original segments data and ! * the master index.</p> * ! * @author Andrzej Bialecki <[EMAIL PROTECTED]> */ ! public class SegmentMergeTool { ! ! public static final Logger LOG = ! LogFormatter.getLogger("net.nutch.tools.SegmentMergeTool"); ! private String master = null; ! private String segments = null; ! private String output = null; private List segdirs = null; private List allsegdirs = null; private boolean runIndexer = false; - private boolean createMaster = false; private boolean delSegs = false; - private boolean delMaster = false; - - // This class holds together all data readers for an existing segment - static class SegmentReader { - public ArrayFile.Reader fetcherReader; - public ArrayFile.Reader contentReader; - public ArrayFile.Reader parseTextReader; - public ArrayFile.Reader parseDataReader; - public long size = 0L; - - public SegmentReader(File dir) throws Exception { - fetcherReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, FetcherOutput.DIR_NAME).toString()); - contentReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, Content.DIR_NAME).toString()); - parseTextReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseText.DIR_NAME).toString()); - parseDataReader = new ArrayFile.Reader(new LocalFileSystem(), new File(dir, ParseData.DIR_NAME).toString()); - // count the number of valid entries. - // XXX We assume that all other data files contain the - // XXX same number of valid entries - which is not always - // XXX true if Fetcher crashed in the middle of update. - // XXX We compensate for this later, when actually - // XXX reading the entries. - Writable w = new FetcherOutput(); - try { - while (fetcherReader.next(w) != null) - size++; - } catch (EOFException eof) { - // the file is truncated - probably due to a crashed fetcher. - // Use just the part that we can... - LOG.warning(" - segment " + dir + " is corrupt, using only " + size + " entries."); - } - // reposition to the start - fetcherReader.reset(); - } - - // Close all readers - public void close() { - try { - fetcherReader.close(); - } catch (Exception e) {}; - try { - contentReader.close(); - } catch (Exception e) {}; - try { - parseTextReader.close(); - } catch (Exception e) {}; - try { - parseDataReader.close(); - } catch (Exception e) {}; - } - } - private HashMap readers = new HashMap(); ! // writers for the output segment ! private ArrayFile.Writer fetcherWriter; ! private ArrayFile.Writer contentWriter; ! private ArrayFile.Writer parseTextWriter; ! private ArrayFile.Writer parseDataWriter; ! ! public SegmentMergeTool(String segments, String output, String master, ! boolean createMaster, boolean runIndexer, ! boolean delSegs, boolean delMaster) throws Exception { ! this.master = master; this.segments = segments; - this.createMaster = createMaster; this.runIndexer = runIndexer; this.delSegs = delSegs; ! this.delMaster = delMaster; ! File segs = new File(segments); ! if (!segs.exists() || !segs.isDirectory()) throw new Exception("Not a segments dir: " + segs); ! File[] dirs = segs.listFiles(new FileFilter() { ! public boolean accept(File file) { ! if (file.isDirectory()) return true; ! return false; ! } ! }); ! allsegdirs = Arrays.asList(dirs); this.output = output; } ! // Create a new segment name ! private String getSegmentName() { ! return new SimpleDateFormat("yyyyMMddHHmmss").format ! (new Date(System.currentTimeMillis())); } ! public void run() { try { segdirs = new ArrayList(); // open all segments - long total = 0L; for (int i = 0; i < allsegdirs.size(); i++) { ! File dir = (File)allsegdirs.get(i); SegmentReader sr = null; try { ! sr = new SegmentReader(dir); } catch (Exception e) { ! // this segment is hosed, don't use it ! LOG.warning(" - segment " + dir + " is corrupt, skipping all entries."); ! continue; } segdirs.add(dir); ! total += sr.size; ! LOG.info("Segment " + dir.getName() + ": " ! + sr.size + " entries."); readers.put(dir.getName(), sr); } ! LOG.info("TOTAL " + total + " input entries in " + segdirs.size() + " segments."); ! File masterDir = null; ! if (master != null) masterDir = new File(master); ! LOG.info("Looking for master index in " + masterDir); ! if (masterDir == null || !IndexReader.indexExists(masterDir)) { ! if (!createMaster) { ! LOG.severe("No master index, and createMaster == false"); ! return; ! } ! masterDir = new File(new File(segments).getParentFile(), "index"); ! if (!masterDir.mkdirs()) { ! LOG.severe("Could not create a master index dir: " + masterDir); ! return; ! } ! LOG.info("Creating master unique index..."); ! // check that all segment indexes exist. If not, create them. ! for (int i = 0; i < segdirs.size(); i++) { ! File dir = (File)segdirs.get(i); ! File indexerDone = new File(dir, IndexSegment.DONE_NAME); ! if (!indexerDone.exists()) { ! // Index this segment ! LOG.info(" - creating missing index for " + dir.getName()); ! IndexSegment.main(new String[]{dir.toString()}); } } ! LOG.info(" - deleting duplicates from indexes in " + segments); ! DeleteDuplicates.main(new String[]{segments, new File(segments).getParentFile().toString()}); ! LOG.info(" - creating merged index in " + masterDir); ! String[] args = new String[segdirs.size() + 1]; ! args[0] = masterDir.toString(); ! for (int i = 0; i < segdirs.size(); i++) { ! args[i + 1] = ((File)segdirs.get(i)).toString(); } - IndexMerger.main(args); } IndexReader ir = IndexReader.open(masterDir); ! File directory = new File(output); ! if (directory.exists() && !directory.isDirectory()) throw new Exception("Output dir is not a directory: " + directory); ! ! if (!directory.exists()) directory.mkdirs(); ! directory = new File(directory, getSegmentName()); ! LOG.info("Merging all segments into " + directory); ! directory.mkdirs(); ! fetcherWriter = new ArrayFile.Writer ! (new LocalFileSystem(), new File(directory, FetcherOutput.DIR_NAME).toString(), ! FetcherOutput.class); ! contentWriter = new ArrayFile.Writer ! (new LocalFileSystem(), new File(directory, Content.DIR_NAME).toString(), Content.class); ! parseTextWriter = new ArrayFile.Writer ! (new LocalFileSystem(), new File(directory, ParseText.DIR_NAME).toString(), ParseText.class); ! parseDataWriter = new ArrayFile.Writer ! (new LocalFileSystem(), new File(directory, ParseData.DIR_NAME).toString(), ParseData.class); ! FetcherOutput fo = new FetcherOutput(); Content co = new Content(); ParseText pt = new ParseText(); ParseData pd = new ParseData(); - - int docCnt = ir.numDocs(); int outputCnt = 0; ! for (int i = 0; i < docCnt; i++) { ! if (i > 0 && (i % 500 == 0)) LOG.info("Processed " + i + " entries."); ! if (ir.isDeleted(i)) { ! LOG.fine("\n- skip deleted doc # " + i); ! continue; ! } ! Document doc = ir.document(i); ! String segName = doc.get("segment"); ! SegmentReader sr = (SegmentReader)readers.get(segName); ! if (sr == null) { ! LOG.warning("\n- no SegmentReader for " + segName); continue; } ! String docNo = doc.get("docNo"); ! long docid = -1L; try { ! docid = Long.parseLong(docNo, 16); } catch (Exception e) { - LOG.warning("\n- bad docNo: " + docNo); continue; } try { ! // get data from the reader ! sr.fetcherReader.get(docid, fo); ! sr.contentReader.get(docid, co); ! sr.parseTextReader.get(docid, pt); ! sr.parseDataReader.get(docid, pd); ! } catch (Throwable t) { ! // don't break the loop, because only one of the segments ! // may be corrupted... ! LOG.fine(" - corrupt entry no. " + docid + " in segment " + segName + " - skipping."); ! continue; } ! // write it back ! fetcherWriter.append(fo); ! contentWriter.append(co); ! parseTextWriter.append(pt); ! parseDataWriter.append(pd); outputCnt++; } ir.close(); ! fetcherWriter.close(); ! contentWriter.close(); ! parseTextWriter.close(); ! parseDataWriter.close(); ! for (Iterator i = readers.keySet().iterator(); i.hasNext(); ) { ! SegmentReader sr = (SegmentReader)readers.get(i.next()); sr.close(); } if (runIndexer) { ! LOG.info("Creating new segment index..."); ! IndexSegment.main(new String[]{directory.toString()}); } if (delSegs) { // This deletes also all corrupt segments, which are // unusable anyway ! LOG.info("Deleting old segments..."); ! for (int i = 0; i < allsegdirs.size(); i++) { ! FileUtil.fullyDelete((File)allsegdirs.get(i)); } } ! if (delMaster) { ! LOG.info("Deleting old master index..."); ! FileUtil.fullyDelete(masterDir); ! } ! LOG.info("DONE segment merging, INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries."); } catch (Exception e) { e.printStackTrace(); --- 4,431 ---- package net.nutch.tools; import java.io.File; import java.io.FileFilter; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; ! import java.util.Vector; import java.util.logging.Logger; import net.nutch.fetcher.FetcherOutput; import net.nutch.indexer.IndexSegment; ! import net.nutch.io.MD5Hash; import net.nutch.parse.ParseData; import net.nutch.parse.ParseText; import net.nutch.protocol.Content; ! import net.nutch.segment.SegmentReader; ! import net.nutch.segment.SegmentWriter; ! import net.nutch.util.FileUtil; ! import net.nutch.util.LogFormatter; ! import net.nutch.util.NutchFileSystem; + import org.apache.lucene.analysis.WhitespaceAnalyzer; + import org.apache.lucene.document.DateField; import org.apache.lucene.document.Document; + import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexReader; + import org.apache.lucene.index.IndexWriter; + import org.apache.lucene.index.Term; + import org.apache.lucene.index.TermDocs; + import org.apache.lucene.index.TermEnum; /** ! * This class cleans up accumulated segments data, and merges them into a single ! * (or optionally multiple) segment(s), with no duplicates in it. * ! * <p> ! * There are no prerequisites for its correct ! * operation except for a set of already fetched segments (they don't have to ! * contain parsed content, only fetcher output is required). This tool does not ! * use DeleteDuplicates, but creates its own "master" index of all pages in all ! * segments. Then it walks sequentially through this index and picks up only ! * most recent versions of pages for every unique value of url or hash. ! * </p> ! * <p>If some of the input segments are corrupted, this tool will attempt to ! * repair them, using ! * [EMAIL PROTECTED] net.nutch.segment.SegmentReader#fixSegment(NutchFileSystem, File, boolean, boolean, boolean, boolean)} method.</p> ! * <p>Output segment can be optionally split on the fly into several segments of fixed ! * length.</p> ! * <p> ! * The newly created segment(s) can be then optionally indexed, so that it can be ! * either merged with more new segments, or used for searching as it is. ! * </p> ! * <p> ! * Old segments may be optionally removed, because all needed data has already ! * been copied to the new merged segment. NOTE: this tool will remove also all ! * corrupted input segments, which are not useable anyway - however, this option ! * may be dangerous if you inadvertently included non-segment directories as ! * input...</p> ! * <p> ! * You may want to run SegmentMergeTool instead of following the manual procedures, ! * with all options turned on, i.e. to merge segments into the output segment(s), ! * index it, and then delete the original segments data. ! * </p> ! * ! * @author Andrzej Bialecki <[EMAIL PROTECTED]> */ ! public class SegmentMergeTool implements Runnable { ! ! public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.SegmentMergeTool"); ! ! /** Log progress update every LOG_STEP items. */ ! public static int LOG_STEP = 20000; ! /** Temporary de-dup index size. Larger indexes tend to slow down indexing. ! * Too many indexes slow down the subsequent index merging. It's a tradeoff value... ! */ ! public static int INDEX_SIZE = 250000; ! public static int INDEX_MERGE_FACTOR = 30; ! public static int INDEX_MIN_MERGE_DOCS = 100; ! ! private NutchFileSystem nfs = null; ! private File[] segments = null; ! private int stage = SegmentMergeStatus.STAGE_OPENING; ! private long totalRecords = 0L; ! private long processedRecords = 0L; ! private long start = 0L; ! private long maxCount = Long.MAX_VALUE; ! private File output = null; private List segdirs = null; private List allsegdirs = null; private boolean runIndexer = false; private boolean delSegs = false; private HashMap readers = new HashMap(); ! /** ! * Create a SegmentMergeTool. ! * @param nfs filesystem ! * @param segments list of input segments ! * @param output output directory, where output segments will be created ! * @param maxCount maximum number of records per output segment. If this ! * value is 0, then the default value [EMAIL PROTECTED] Long#MAX_VALUE} is used. ! * @param runIndexer run indexer on output segment(s) ! * @param delSegs delete input segments when finished ! * @throws Exception ! */ ! public SegmentMergeTool(NutchFileSystem nfs, File[] segments, File output, long maxCount, boolean runIndexer, boolean delSegs) throws Exception { ! this.nfs = nfs; this.segments = segments; this.runIndexer = runIndexer; this.delSegs = delSegs; ! if (maxCount > 0) this.maxCount = maxCount; ! allsegdirs = Arrays.asList(segments); this.output = output; + if (nfs.exists(output)) { + if (!nfs.isDirectory(output)) + throw new Exception("Output is not a directory: " + output); + } else nfs.mkdirs(output); } ! public static class SegmentMergeStatus { ! public static final int STAGE_OPENING = 0; ! public static final int STAGE_MASTERIDX = 1; ! public static final int STAGE_MERGEIDX = 2; ! public static final int STAGE_DEDUP = 3; ! public static final int STAGE_WRITING = 4; ! public static final int STAGE_INDEXING = 5; ! public static final int STAGE_DELETING = 6; ! public static final String[] stages = { ! "opening input segments", ! "creating master index", ! "merging sub-indexes", ! "deduplicating", ! "writing output segment(s)", ! "indexing output segment(s)", ! "deleting input segments" ! }; ! public int stage; ! public File[] inputSegments; ! public long startTime, curTime; ! public long totalRecords; ! public long processedRecords; ! ! public SegmentMergeStatus() {}; ! ! public SegmentMergeStatus(int stage, File[] inputSegments, long startTime, ! long totalRecords, long processedRecords) { ! this.stage = stage; ! this.inputSegments = inputSegments; ! this.startTime = startTime; ! this.curTime = System.currentTimeMillis(); ! this.totalRecords = totalRecords; ! this.processedRecords = processedRecords; ! } } ! ! public SegmentMergeStatus getStatus() { ! SegmentMergeStatus status = new SegmentMergeStatus(stage, segments, start, ! totalRecords, processedRecords); ! return status; ! } ! ! /** Run the tool, periodically reporting progress. */ public void run() { + start = System.currentTimeMillis(); + stage = SegmentMergeStatus.STAGE_OPENING; + long delta; + LOG.info("* Opening " + allsegdirs.size() + " segments:"); try { segdirs = new ArrayList(); // open all segments for (int i = 0; i < allsegdirs.size(); i++) { ! File dir = (File) allsegdirs.get(i); SegmentReader sr = null; try { ! // try to autofix it if corrupted... ! sr = new SegmentReader(nfs, dir, true); } catch (Exception e) { ! // this segment is hosed beyond repair, don't use it ! continue; } segdirs.add(dir); ! totalRecords += sr.size; ! LOG.info(" - segment " + dir.getName() + ": " + sr.size + " records."); readers.put(dir.getName(), sr); } ! long total = totalRecords; ! LOG.info("* TOTAL " + total + " input records in " + segdirs.size() + " segments."); ! LOG.info("* Creating master index..."); ! stage = SegmentMergeStatus.STAGE_MASTERIDX; ! // XXX Note that Lucene indexes don't work with NutchFileSystem for now. ! // XXX For now always assume LocalFileSystem here... ! Vector masters = new Vector(); ! File fsmtIndexDir = new File(output, ".fastmerge_index"); ! File masterDir = new File(fsmtIndexDir, "0"); ! if (!masterDir.mkdirs()) { ! LOG.severe("Could not create a master index dir: " + masterDir); ! return; ! } ! masters.add(masterDir); ! IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true); ! iw.setUseCompoundFile(false); ! iw.mergeFactor = INDEX_MERGE_FACTOR; ! iw.minMergeDocs = INDEX_MIN_MERGE_DOCS; ! long s1 = System.currentTimeMillis(); ! Iterator it = readers.values().iterator(); ! processedRecords = 0L; ! delta = System.currentTimeMillis(); ! while (it.hasNext()) { ! SegmentReader sr = (SegmentReader) it.next(); ! String name = sr.segmentDir.getName(); ! FetcherOutput fo = new FetcherOutput(); ! for (long i = 0; i < sr.size; i++) { ! try { ! if (!sr.get(i, fo, null, null, null)) break; ! ! Document doc = new Document(); ! doc.add(new Field("sd", name + "|" + i, true, false, false)); ! doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false)); ! doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false)); ! doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false)); ! iw.addDocument(doc); ! processedRecords++; ! if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) { ! LOG.info(" Processed " + processedRecords + " records (" + ! (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); ! delta = System.currentTimeMillis(); ! } ! if (processedRecords > 0 && (processedRecords % INDEX_SIZE == 0)) { ! iw.optimize(); ! iw.close(); ! LOG.info(" - creating next subindex..."); ! masterDir = new File(fsmtIndexDir, "" + masters.size()); ! if (!masterDir.mkdirs()) { ! LOG.severe("Could not create a master index dir: " + masterDir); ! return; ! } ! masters.add(masterDir); ! iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true); ! iw.setUseCompoundFile(false); ! iw.mergeFactor = INDEX_MERGE_FACTOR; ! iw.minMergeDocs = INDEX_MIN_MERGE_DOCS; ! } ! } catch (Throwable t) { ! // we can assume the data is invalid from now on - break here ! LOG.info(" - segment " + name + " truncated to " + (i + 1) + " records"); ! break; } } ! } ! iw.optimize(); ! LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms"); ! s1 = System.currentTimeMillis(); ! // merge all other indexes using the latest IndexWriter (still open): ! if (masters.size() > 1) { ! LOG.info(" - merging subindexes..."); ! stage = SegmentMergeStatus.STAGE_MERGEIDX; ! IndexReader[] ireaders = new IndexReader[masters.size() - 1]; ! for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File)masters.get(i)); ! iw.addIndexes(ireaders); ! for (int i = 0; i < masters.size() - 1; i++) { ! ireaders[i].close(); ! FileUtil.fullyDelete((File)masters.get(i)); } } + iw.close(); + LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms"); + LOG.info("* Removing duplicate entries..."); + stage = SegmentMergeStatus.STAGE_DEDUP; IndexReader ir = IndexReader.open(masterDir); ! int i = 0; ! long cnt = 0L; ! processedRecords = 0L; ! s1 = System.currentTimeMillis(); ! delta = s1; ! TermEnum te = ir.terms(); ! while(te.next()) { ! Term t = te.term(); ! if (t == null) continue; ! if (!(t.field().equals("ch") || t.field().equals("uh"))) continue; ! cnt++; ! processedRecords = cnt / 2; ! if (cnt > 0 && (cnt % (LOG_STEP * 2) == 0)) { ! LOG.info(" Processed " + processedRecords + " records (" + ! (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); ! delta = System.currentTimeMillis(); ! } ! // Enumerate all docs with the same URL hash or content hash ! TermDocs td = ir.termDocs(t); ! if (td == null) continue; ! int id = -1; ! String time = null; ! Document doc = null; ! // Keep only the latest version of the document with ! // the same hash (url or content). Note: even if the content ! // hash is identical, other metadata may be different, so even ! // in this case it makes sense to keep the latest version. ! while (td.next()) { ! int docid = td.doc(); ! if (!ir.isDeleted(docid)) { ! doc = ir.document(docid); ! if (time == null) { ! time = doc.get("time"); ! id = docid; ! continue; ! } ! String dtime = doc.get("time"); ! // "time" is a DateField, and can be compared lexicographically ! if (dtime.compareTo(time) > 0) { ! if (id != -1) { ! ir.delete(id); ! } ! time = dtime; ! id = docid; ! } else { ! ir.delete(docid); ! } ! } ! } ! } ! // ! // keep the IndexReader open... ! // ! ! LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms"); ! stage = SegmentMergeStatus.STAGE_WRITING; ! processedRecords = 0L; ! Vector outDirs = new Vector(); ! File outDir = new File(output, SegmentWriter.getNewSegmentName()); ! outDirs.add(outDir); ! LOG.info("* Merging all segments into " + output.getName()); ! s1 = System.currentTimeMillis(); ! delta = s1; ! nfs.mkdirs(outDir); ! SegmentWriter sw = new SegmentWriter(nfs, outDir, true); ! LOG.fine(" - opening first output segment in " + outDir.getName()); FetcherOutput fo = new FetcherOutput(); Content co = new Content(); ParseText pt = new ParseText(); ParseData pd = new ParseData(); int outputCnt = 0; ! for (int n = 0; n < ir.maxDoc(); n++) { ! if (ir.isDeleted(n)) { ! //System.out.println("-del"); continue; } ! Document doc = ir.document(n); ! String segDoc = doc.get("sd"); ! int idx = segDoc.indexOf('|'); ! String segName = segDoc.substring(0, idx); ! String docName = segDoc.substring(idx + 1); ! SegmentReader sr = (SegmentReader) readers.get(segName); ! long docid; try { ! docid = Long.parseLong(docName); } catch (Exception e) { continue; } try { ! // get data from the reader ! sr.get(docid, fo, co, pt, pd); ! } catch (Throwable thr) { ! // don't break the loop, because only one of the segments ! // may be corrupted... ! LOG.fine(" - corrupt record no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping."); ! continue; } ! sw.append(fo, co, pt, pd); outputCnt++; + processedRecords++; + if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) { + LOG.info(" Processed " + processedRecords + " records (" + + (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); + delta = System.currentTimeMillis(); + } + if (processedRecords % maxCount == 0) { + sw.close(); + outDir = new File(output, SegmentWriter.getNewSegmentName()); + LOG.fine(" - starting next output segment in " + outDir.getName()); + nfs.mkdirs(outDir); + sw = new SegmentWriter(nfs, outDir, true); + outDirs.add(outDir); + } } + LOG.info("* Merging took " + (System.currentTimeMillis() - s1) + " ms"); ir.close(); ! sw.close(); ! FileUtil.fullyDelete(fsmtIndexDir); ! for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) { ! SegmentReader sr = (SegmentReader) readers.get(iter.next()); sr.close(); } if (runIndexer) { ! stage = SegmentMergeStatus.STAGE_INDEXING; ! totalRecords = outDirs.size(); ! processedRecords = 0L; ! LOG.info("* Creating new segment index(es)..."); ! File workingDir = new File(output, "indexsegment-workingdir"); ! for (int k = 0; k < outDirs.size(); k++) { ! processedRecords++; ! if (workingDir.exists()) { ! FileUtil.fullyDelete(workingDir); ! } ! IndexSegment indexer = new IndexSegment(nfs, Integer.MAX_VALUE, ! (File)outDirs.get(k), workingDir); ! indexer.indexPages(); ! FileUtil.fullyDelete(workingDir); ! } } if (delSegs) { // This deletes also all corrupt segments, which are // unusable anyway ! stage = SegmentMergeStatus.STAGE_DELETING; ! totalRecords = allsegdirs.size(); ! processedRecords = 0L; ! LOG.info("* Deleting old segments..."); ! for (int k = 0; k < allsegdirs.size(); k++) { ! processedRecords++; ! FileUtil.fullyDelete((File) allsegdirs.get(k)); } } ! delta = System.currentTimeMillis() - start; ! float eps = (float) total / (float) (delta / 1000); ! LOG.info("Finished SegmentMergeTool: INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in " ! + ((float) delta / 1000f) + " s (" + eps + " entries/sec)."); } catch (Exception e) { e.printStackTrace(); *************** *** 309,371 **** } } ! public static void main(String[] args) throws Exception { ! if (args.length < 2) { System.err.println("Too few arguments.\n"); usage(); System.exit(-1); } ! boolean createMaster = false; boolean runIndexer = false; boolean delSegs = false; ! boolean delMaster = false; ! String index = null; ! String output = null; ! for (int i = 1; i < args.length; i++) { ! if (args[i].equals("-m")) { if (args.length > i + 1) { ! index = args[++i]; continue; } else { ! System.err.println("Required value of '-m' argument missing.\n"); usage(); ! System.exit(-1); } ! } else if(args[i].equals("-o")) { ! if (args.length > i + 1) { ! output = args[++i]; ! continue; ! } else { ! System.err.println("Required value of '-o' argument missing.\n"); ! usage(); ! System.exit(-1); } } - else if (args[i].equals("-cm")) createMaster = true; - else if (args[i].equals("-i")) runIndexer = true; - else if (args[i].equals("-ds")) delSegs = true; - else if (args[i].equals("-dm")) delMaster = true; - } ! if (index == null && !createMaster) { ! System.err.println("No master index, and createMaster == false.\n"); ! usage(); ! System.exit(-1); } ! if (output == null) output = args[0]; ! SegmentMergeTool st = new SegmentMergeTool(args[0], output, index, ! createMaster, runIndexer, delSegs, delMaster); st.run(); } ! private static void usage() { ! System.err.println("SegmentMergeTool <input_segments_dir> [-o <output_segment_dir>] [-m <unique_index_dir> | -cm] [-i] [-ds] [-dm]"); ! System.err.println("\t<input_segments_dir>\tpath to directory containing\n\t\t\t\tall input segments"); ! System.err.println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain a single output segment.\n\t\t\tNOTE: If not present, the original segments path will be used."); ! System.err.println("\n\t-m <unique_index_dir>\tpath to 'master' unique\n\t\t\t\tindex for all input segments.\n\t\t\tNOTE: either this or the '-cm' option MUST be used!"); ! System.err.println("\t-cm\t\t(optional) create the 'master' index first,\n\t\t\tby running IndexSegment on each segment,\n\t\t\tthen DeleteDuplicates, and then IndexMerger."); ! System.err.println("\t-i\t\t(optional) index the output segment."); ! System.err.println("\t-ds\t\t(optional) delete the original segments when finished."); ! System.err.println("\t-dm\t\t(optional) delete the original master index when finished."); System.err.println(); } --- 433,511 ---- } } ! public static void main(String[] args) throws Exception { ! if (args.length < 1) { System.err.println("Too few arguments.\n"); usage(); System.exit(-1); } ! NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); boolean runIndexer = false; boolean delSegs = false; ! long maxCount = Long.MAX_VALUE; ! String segDir = null; ! File output = null; ! Vector dirs = new Vector(); ! for (int i = 0; i < args.length; i++) { ! if (args[i] == null) continue; ! if (args[i].equals("-o")) { if (args.length > i + 1) { ! output = new File(args[++i]); continue; } else { ! LOG.severe("Required value of '-o' argument missing.\n"); usage(); ! return; } ! } else if (args[i].equals("-i")) { ! runIndexer = true; ! } else if (args[i].equals("-cm")) { ! LOG.warning("'-cm' option obsolete - ignored."); ! } else if (args[i].equals("-max")) { ! String cnt = args[++i]; ! try { ! maxCount = Long.parseLong(cnt); ! } catch (Exception e) { ! LOG.warning("Invalid count '" + cnt + "', setting to Long.MAX_VALUE."); ! } ! } else if (args[i].equals("-ds")) { ! delSegs = true; ! } else if (args[i].equals("-dir")) { ! segDir = args[++i]; ! } else dirs.add(new File(args[i])); ! } ! if (segDir != null) { ! File sDir = new File(segDir); ! if (!sDir.exists() || !sDir.isDirectory()) { ! LOG.warning("Invalid path: " + sDir); ! } else { ! File[] files = sDir.listFiles(new FileFilter() { ! public boolean accept(File f) { ! return f.isDirectory(); ! } ! }); ! if (files != null && files.length > 0) { ! for (int i = 0; i < files.length; i++) dirs.add(files[i]); } } } ! if (dirs.size() == 0) { ! LOG.severe("No input segments."); ! return; } ! if (output == null) output = (File)dirs.get(0); ! SegmentMergeTool st = new SegmentMergeTool(nfs, (File[])dirs.toArray(new File[0]), ! output, maxCount, runIndexer, delSegs); st.run(); } ! private static void usage() { ! System.err.println("SegmentMergeTool (-local | -nfs ...) (-dir <input_segments_dir> | seg1 seg2 ...) [-o <output_segments_dir>] [-max count] [-i] [-ds]"); ! System.err.println("\t-dir <input_segments_dir>\tpath to directory containing input segments"); ! System.err.println("\tseg1 seg2 seg3\t\tindividual paths to input segments"); ! System.err.println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain output segment(s).\n\t\t\tNOTE: If not present, the original segments path will be used."); ! System.err.println("\t-max count\t(optional) output multiple segments, each with maximum 'count' entries"); ! System.err.println("\t-i\t\t(optional) index the output segment when finished merging."); ! System.err.println("\t-ds\t\t(optional) delete the original input segments when finished."); System.err.println(); } ------------------------------------------------------- This SF.Net email is sponsored by: InterSystems CACHE FREE OODBMS DOWNLOAD - A multidimensional database that combines robust object and relational technologies, making it a perfect match for Java, C++,COM, XML, ODBC and JDBC. www.intersystems.com/match8 _______________________________________________ Nutch-cvs mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/nutch-cvs