Author: ab
Date: Tue May 30 14:12:52 2006
New Revision: 410377

URL: http://svn.apache.org/viewvc?rev=410377&view=rev
Log:
SegmentMerger bug-fixes and improvements:

* replace deprecated use of java.io.File with Hadoop's Path.

* old segment name from Content.metadata needs to be replaced with
  the new segment name. This was causing NPE-s when getting hit
  summaries.

* SegmentMerger will now always create its output in a subdirectory
  of the output_dir argument. All newly created segments will follow
  the same naming convention as other segments (i.e. yyyyMMddHHmmss),
  and use sequential suffixes for sliced segments (yyyyMMddHHmmss-NN).

Rename Generator.getDate() to Generator.generateSegmentName(), and
make it public. Additionally, this method now will try to ensure that
unique segment names are created.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=410377&r1=410376&r2=410377&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Tue May 
30 14:12:52 2006
@@ -243,7 +243,7 @@
                "/generate-temp-"+
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
-    Path segment = new Path(segments, getDate());
+    Path segment = new Path(segments, generateSegmentName());
     Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
 
     LOG.info("Generator: starting");
@@ -305,9 +305,14 @@
 
     return segment;
   }
+  
+  private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
 
-  private static String getDate() {
-    return new SimpleDateFormat("yyyyMMddHHmmss").format
+  public static synchronized String generateSegmentName() {
+    try {
+      Thread.sleep(1000);
+    } catch (Throwable t) {};
+    return sdf.format
       (new Date(System.currentTimeMillis()));
   }
 

Modified: 
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=410377&r1=410376&r2=410377&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue 
May 30 14:12:52 2006
@@ -16,17 +16,19 @@
 
 package org.apache.nutch.segment;
 
-import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.util.*;
 import java.util.logging.Logger;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.fetcher.Fetcher;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.parse.ParseData;
@@ -104,7 +106,7 @@
 
       reporter.setStatus(split.toString());
       // find part name
-      String dir = split.getFile().toString().replace('\\', '/');
+      String dir = split.getPath().toString().replace('\\', '/');
       int idx = dir.lastIndexOf("/part-");
       if (idx == -1) {
         throw new IOException("Cannot determine segment part: " + dir);
@@ -173,6 +175,7 @@
         SequenceFile.Writer g_out = null;
         SequenceFile.Writer p_out = null;
         HashMap sliceWriters = new HashMap();
+        String segmentName = job.get("segment.merger.segmentName");
         
         public void write(WritableComparable key, Writable value) throws 
IOException {
           // unwrap
@@ -205,12 +208,24 @@
             slice = ((Content)o).getMetadata().get(sliceMarker);
             ((Content)o).getMetadata().remove(sliceMarker);
             ((Content)o).getMetadata().remove(nameMarker);
+            // update the segment name inside metadata
+            if (slice == null) {
+              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName);
+            } else {
+              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName + "-" + slice);
+            }
             c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
             c_out.append(key, o);
           } else if (o instanceof ParseData) {
             slice = ((ParseData)o).getParseMeta().get(sliceMarker);
             ((ParseData)o).getParseMeta().remove(sliceMarker);
             ((ParseData)o).getParseMeta().remove(nameMarker);
+            // update the segment name inside contentMeta - required by Indexer
+            if (slice == null) {
+              ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName);
+            } else {
+              ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName + "-" + slice);
+            }
             pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
             pd_out.append(key, o);
           } else if (o instanceof ParseText) {
@@ -243,11 +258,11 @@
           if (slice == null) slice = DEFAULT_SLICE;
           SequenceFile.Writer res = 
(SequenceFile.Writer)sliceWriters.get(slice + dirName);
           if (res != null) return res;
-          String wname;
+          Path wname;
           if (slice == DEFAULT_SLICE) {
-            wname = new File(new File(job.getOutputDir(), dirName), 
name).toString();
+            wname = new Path(new Path(new Path(job.getOutputPath(), 
segmentName), dirName), name);
           } else {
-            wname = new File(new File(new File(job.getOutputDir(), slice), 
dirName), name).toString();
+            wname = new Path(new Path(new Path(job.getOutputPath(), 
segmentName + "-" + slice), dirName), name);
           }
           res = new SequenceFile.Writer(fs, wname, UTF8.class, 
CrawlDatum.class);
           sliceWriters.put(slice + dirName, res);
@@ -259,13 +274,13 @@
           if (slice == null) slice = DEFAULT_SLICE;
           MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + 
dirName);
           if (res != null) return res;
-          String wname;
+          Path wname;
           if (slice == DEFAULT_SLICE) {
-            wname = new File(new File(job.getOutputDir(), dirName), 
name).toString();
+            wname = new Path(new Path(new Path(job.getOutputPath(), 
segmentName), dirName), name);
           } else {
-            wname = new File(new File(new File(job.getOutputDir(), slice), 
dirName), name).toString();
+            wname = new Path(new Path(new Path(job.getOutputPath(), 
segmentName + "-" + slice), dirName), name);
           }
-          res = new MapFile.Writer(fs, wname, UTF8.class, clazz);
+          res = new MapFile.Writer(fs, wname.toString(), UTF8.class, clazz);
           sliceWriters.put(slice + dirName, res);
           return res;
         }
@@ -512,12 +527,14 @@
     }
   }
 
-  public void merge(File out, File[] segs, boolean filter, long slice) throws 
Exception {
-    LOG.info("Merging " + segs.length + " segments to " + out);
+  public void merge(Path out, Path[] segs, boolean filter, long slice) throws 
Exception {
+    String segmentName = Generator.generateSegmentName();
     JobConf job = new JobConf(getConf());
-    job.setJobName("mergesegs " + out);
+    job.setJobName("mergesegs " + out + "/" + segmentName);
+    LOG.info("Merging " + segs.length + " segments to " + out + "/" + 
segmentName);
     job.setBoolean("segment.merger.filter", filter);
     job.setLong("segment.merger.slice", slice);
+    job.set("segment.merger.segmentName", segmentName);
     FileSystem fs = FileSystem.get(getConf());
     // prepare the minimal common set of input dirs
     boolean g = true;
@@ -533,12 +550,12 @@
         continue;
       }
       LOG.info("SegmentMerger:   adding " + segs[i]);
-      File cDir = new File(segs[i], Content.DIR_NAME);
-      File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
-      File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
-      File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
-      File pdDir = new File(segs[i], ParseData.DIR_NAME);
-      File ptDir = new File(segs[i], ParseText.DIR_NAME);
+      Path cDir = new Path(segs[i], Content.DIR_NAME);
+      Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+      Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+      Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+      Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+      Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
       c = c && fs.exists(cDir);
       g = g && fs.exists(gDir);
       f = f && fs.exists(fDir);
@@ -557,28 +574,28 @@
     for (int i = 0; i < segs.length; i++) {
       if (segs[i] == null) continue;
       if (g) {
-        File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
-        job.addInputDir(gDir);
+        Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+        job.addInputPath(gDir);
       }
       if (c) {
-        File cDir = new File(segs[i], Content.DIR_NAME);
-        job.addInputDir(cDir);
+        Path cDir = new Path(segs[i], Content.DIR_NAME);
+        job.addInputPath(cDir);
       }
       if (f) {
-        File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
-        job.addInputDir(fDir);
+        Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+        job.addInputPath(fDir);
       }
       if (p) {
-        File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
-        job.addInputDir(pDir);
+        Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+        job.addInputPath(pDir);
       }
       if (pd) {
-        File pdDir = new File(segs[i], ParseData.DIR_NAME);
-        job.addInputDir(pdDir);
+        Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+        job.addInputPath(pdDir);
       }
       if (pt) {
-        File ptDir = new File(segs[i], ParseText.DIR_NAME);
-        job.addInputDir(ptDir);
+        Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+        job.addInputPath(ptDir);
       }
     }
     job.setInputFormat(ObjectInputFormat.class);
@@ -586,7 +603,7 @@
     job.setInputValueClass(ObjectWritable.class);
     job.setMapperClass(SegmentMerger.class);
     job.setReducerClass(SegmentMerger.class);
-    job.setOutputDir(out);
+    job.setOutputPath(out);
     job.setOutputKeyClass(UTF8.class);
     job.setOutputValueClass(ObjectWritable.class);
     job.setOutputFormat(SegmentOutputFormat.class);
@@ -602,7 +619,7 @@
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 
...) [-filter] [-slice NNNN]");
-      System.err.println("\toutput_dir\tname of the resulting segment, or the 
parent dir of segment slices");
+      System.err.println("\toutput_dir\tname of the parent dir for output 
segment slice(s)");
       System.err.println("\t-dir segments\tparent dir containing several 
segments");
       System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
       System.err.println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
@@ -611,14 +628,14 @@
     }
     Configuration conf = NutchConfiguration.create();
     final FileSystem fs = FileSystem.get(conf);
-    File out = new File(args[0]);
+    Path out = new Path(args[0]);
     ArrayList segs = new ArrayList();
     long sliceSize = 0;
     boolean filter = false;
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-dir")) {
-        File[] files = fs.listFiles(new File(args[++i]), new FileFilter() {
-          public boolean accept(File f) {
+        Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() {
+          public boolean accept(Path f) {
             try {
               if (fs.isDirectory(f)) return true;
             } catch (IOException e) {}
@@ -633,7 +650,7 @@
       } else if (args[i].equals("-slice")) {
         sliceSize = Long.parseLong(args[++i]);
       } else {
-        segs.add(new File(args[i]));
+        segs.add(new Path(args[i]));
       }
     }
     if (segs.size() == 0) {
@@ -641,7 +658,7 @@
       return;
     }
     SegmentMerger merger = new SegmentMerger(conf);
-    merger.merge(out, (File[]) segs.toArray(new File[segs.size()]), filter, 
sliceSize);
+    merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, 
sliceSize);
   }
 
 }




-------------------------------------------------------
All the advantages of Linux Managed Hosting--Without the Cost and Risk!
Fully trained technicians. The highest number of Red Hat certifications in
the hosting industry. Fanatical Support. Click to learn more
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=107521&bid=248729&dat=121642
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to