Author: ab Date: Tue May 30 14:12:52 2006 New Revision: 410377 URL: http://svn.apache.org/viewvc?rev=410377&view=rev Log: SegmentMerger bug-fixes and improvements:
* replace deprecated use of java.io.File with Hadoop's Path. * old segment name from Content.metadata needs to be replaced with the new segment name. This was causing NPE-s when getting hit summaries. * SegmentMerger will now always create its output in a subdirectory of the output_dir argument. All newly created segments will follow the same naming convention as other segments (i.e. yyyyMMddHHmmss), and use sequential suffixes for sliced segments (yyyyMMddHHmmss-NN). Rename Generator.getDate() to Generator.generateSegmentName(), and make it public. Additionally, this method now will try to ensure that unique segment names are created. Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=410377&r1=410376&r2=410377&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Tue May 30 14:12:52 2006 @@ -243,7 +243,7 @@ "/generate-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - Path segment = new Path(segments, getDate()); + Path segment = new Path(segments, generateSegmentName()); Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); LOG.info("Generator: starting"); @@ -305,9 +305,14 @@ return segment; } + + private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); - private static String getDate() { - return new SimpleDateFormat("yyyyMMddHHmmss").format + public static synchronized String generateSegmentName() { + try { + Thread.sleep(1000); + } catch (Throwable t) {}; + return sdf.format (new Date(System.currentTimeMillis())); } Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=410377&r1=410376&r2=410377&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue May 30 14:12:52 2006 @@ -16,17 +16,19 @@ package org.apache.nutch.segment; -import java.io.File; -import java.io.FileFilter; import java.io.IOException; import java.util.*; import java.util.logging.Logger; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.fetcher.Fetcher; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.net.URLFilters; import org.apache.nutch.parse.ParseData; @@ -104,7 +106,7 @@ reporter.setStatus(split.toString()); // find part name - String dir = split.getFile().toString().replace('\\', '/'); + String dir = split.getPath().toString().replace('\\', '/'); int idx = dir.lastIndexOf("/part-"); if (idx == -1) { throw new IOException("Cannot determine segment part: " + dir); @@ -173,6 +175,7 @@ SequenceFile.Writer g_out = null; SequenceFile.Writer p_out = null; HashMap sliceWriters = new HashMap(); + String segmentName = job.get("segment.merger.segmentName"); public void write(WritableComparable key, Writable value) throws IOException { // unwrap @@ -205,12 +208,24 @@ slice = ((Content)o).getMetadata().get(sliceMarker); ((Content)o).getMetadata().remove(sliceMarker); ((Content)o).getMetadata().remove(nameMarker); + // update the segment name inside metadata + if (slice == null) { + ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName); + } else { + ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice); + } c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); c_out.append(key, o); } else if (o instanceof ParseData) { slice = ((ParseData)o).getParseMeta().get(sliceMarker); ((ParseData)o).getParseMeta().remove(sliceMarker); ((ParseData)o).getParseMeta().remove(nameMarker); + // update the segment name inside contentMeta - required by Indexer + if (slice == null) { + ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName); + } else { + ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice); + } pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); pd_out.append(key, o); } else if (o instanceof ParseText) { @@ -243,11 +258,11 @@ if (slice == null) slice = DEFAULT_SLICE; SequenceFile.Writer res = (SequenceFile.Writer)sliceWriters.get(slice + dirName); if (res != null) return res; - String wname; + Path wname; if (slice == DEFAULT_SLICE) { - wname = new File(new File(job.getOutputDir(), dirName), name).toString(); + wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name); } else { - wname = new File(new File(new File(job.getOutputDir(), slice), dirName), name).toString(); + wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } res = new SequenceFile.Writer(fs, wname, UTF8.class, CrawlDatum.class); sliceWriters.put(slice + dirName, res); @@ -259,13 +274,13 @@ if (slice == null) slice = DEFAULT_SLICE; MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName); if (res != null) return res; - String wname; + Path wname; if (slice == DEFAULT_SLICE) { - wname = new File(new File(job.getOutputDir(), dirName), name).toString(); + wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name); } else { - wname = new File(new File(new File(job.getOutputDir(), slice), dirName), name).toString(); + wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } - res = new MapFile.Writer(fs, wname, UTF8.class, clazz); + res = new MapFile.Writer(fs, wname.toString(), UTF8.class, clazz); sliceWriters.put(slice + dirName, res); return res; } @@ -512,12 +527,14 @@ } } - public void merge(File out, File[] segs, boolean filter, long slice) throws Exception { - LOG.info("Merging " + segs.length + " segments to " + out); + public void merge(Path out, Path[] segs, boolean filter, long slice) throws Exception { + String segmentName = Generator.generateSegmentName(); JobConf job = new JobConf(getConf()); - job.setJobName("mergesegs " + out); + job.setJobName("mergesegs " + out + "/" + segmentName); + LOG.info("Merging " + segs.length + " segments to " + out + "/" + segmentName); job.setBoolean("segment.merger.filter", filter); job.setLong("segment.merger.slice", slice); + job.set("segment.merger.segmentName", segmentName); FileSystem fs = FileSystem.get(getConf()); // prepare the minimal common set of input dirs boolean g = true; @@ -533,12 +550,12 @@ continue; } LOG.info("SegmentMerger: adding " + segs[i]); - File cDir = new File(segs[i], Content.DIR_NAME); - File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME); - File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME); - File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME); - File pdDir = new File(segs[i], ParseData.DIR_NAME); - File ptDir = new File(segs[i], ParseText.DIR_NAME); + Path cDir = new Path(segs[i], Content.DIR_NAME); + Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); + Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); + Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); + Path pdDir = new Path(segs[i], ParseData.DIR_NAME); + Path ptDir = new Path(segs[i], ParseText.DIR_NAME); c = c && fs.exists(cDir); g = g && fs.exists(gDir); f = f && fs.exists(fDir); @@ -557,28 +574,28 @@ for (int i = 0; i < segs.length; i++) { if (segs[i] == null) continue; if (g) { - File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME); - job.addInputDir(gDir); + Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); + job.addInputPath(gDir); } if (c) { - File cDir = new File(segs[i], Content.DIR_NAME); - job.addInputDir(cDir); + Path cDir = new Path(segs[i], Content.DIR_NAME); + job.addInputPath(cDir); } if (f) { - File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME); - job.addInputDir(fDir); + Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); + job.addInputPath(fDir); } if (p) { - File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME); - job.addInputDir(pDir); + Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); + job.addInputPath(pDir); } if (pd) { - File pdDir = new File(segs[i], ParseData.DIR_NAME); - job.addInputDir(pdDir); + Path pdDir = new Path(segs[i], ParseData.DIR_NAME); + job.addInputPath(pdDir); } if (pt) { - File ptDir = new File(segs[i], ParseText.DIR_NAME); - job.addInputDir(ptDir); + Path ptDir = new Path(segs[i], ParseText.DIR_NAME); + job.addInputPath(ptDir); } } job.setInputFormat(ObjectInputFormat.class); @@ -586,7 +603,7 @@ job.setInputValueClass(ObjectWritable.class); job.setMapperClass(SegmentMerger.class); job.setReducerClass(SegmentMerger.class); - job.setOutputDir(out); + job.setOutputPath(out); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(ObjectWritable.class); job.setOutputFormat(SegmentOutputFormat.class); @@ -602,7 +619,7 @@ public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); - System.err.println("\toutput_dir\tname of the resulting segment, or the parent dir of segment slices"); + System.err.println("\toutput_dir\tname of the parent dir for output segment slice(s)"); System.err.println("\t-dir segments\tparent dir containing several segments"); System.err.println("\tseg1 seg2 ...\tlist of segment dirs"); System.err.println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); @@ -611,14 +628,14 @@ } Configuration conf = NutchConfiguration.create(); final FileSystem fs = FileSystem.get(conf); - File out = new File(args[0]); + Path out = new Path(args[0]); ArrayList segs = new ArrayList(); long sliceSize = 0; boolean filter = false; for (int i = 1; i < args.length; i++) { if (args[i].equals("-dir")) { - File[] files = fs.listFiles(new File(args[++i]), new FileFilter() { - public boolean accept(File f) { + Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() { + public boolean accept(Path f) { try { if (fs.isDirectory(f)) return true; } catch (IOException e) {} @@ -633,7 +650,7 @@ } else if (args[i].equals("-slice")) { sliceSize = Long.parseLong(args[++i]); } else { - segs.add(new File(args[i])); + segs.add(new Path(args[i])); } } if (segs.size() == 0) { @@ -641,7 +658,7 @@ return; } SegmentMerger merger = new SegmentMerger(conf); - merger.merge(out, (File[]) segs.toArray(new File[segs.size()]), filter, sliceSize); + merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, sliceSize); } } ------------------------------------------------------- All the advantages of Linux Managed Hosting--Without the Cost and Risk! Fully trained technicians. The highest number of Red Hat certifications in the hosting industry. Fanatical Support. Click to learn more http://sel.as-us.falkag.net/sel?cmd=lnk&kid=107521&bid=248729&dat=121642 _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs