Author: ab
Date: Tue Nov 14 11:44:39 2006
New Revision: 474939

URL: http://svn.apache.org/viewvc?view=rev&rev=474939
Log:
Refactor SegmentMerger to use MetaWrapper. This significantly simplifies the
code, and avoids crude hacks to pass around segment-related metadata.

Added:
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java   
(with props)
Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java

Modified: 
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=474939&r1=474938&r2=474939
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue 
Nov 14 11:44:39 2006
@@ -33,6 +33,7 @@
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Generator;
 import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.metadata.MetaWrapper;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.parse.ParseData;
@@ -90,19 +91,16 @@
 public class SegmentMerger extends Configured implements Mapper, Reducer {
   private static final Log LOG = LogFactory.getLog(SegmentMerger.class);
 
-  private static final Text SEGMENT_PART_KEY = new Text("_PaRt_");
-  private static final Text SEGMENT_NAME_KEY = new Text("_NaMe_");
-  private static final String nameMarker = SEGMENT_NAME_KEY.toString();
-  private static final Text SEGMENT_SLICE_KEY = new Text("_SlIcE_");
-  private static final String sliceMarker = SEGMENT_SLICE_KEY.toString();
+  private static final String SEGMENT_PART_KEY = "part";
+  private static final String SEGMENT_SLICE_KEY = "slice";
 
   private URLFilters filters = null;
   private long sliceSize = -1;
   private long curCount = 0;
   
   /**
-   * Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging 
different
-   * types in reduce.
+   * Wraps inputs in an [EMAIL PROTECTED] MetaWrapper}, to permit merging 
different
+   * types in reduce and use additional metadata.
    */
   public static class ObjectInputFormat extends SequenceFileInputFormat {
     public RecordReader getRecordReader(FileSystem fs, FileSplit split, 
JobConf job, Reporter reporter)
@@ -110,68 +108,30 @@
 
       reporter.setStatus(split.toString());
       // find part name
-      String dir = split.getPath().toString().replace('\\', '/');
-      int idx = dir.lastIndexOf("/part-");
-      if (idx == -1) {
-        throw new IOException("Cannot determine segment part: " + dir);
-      }
-      dir = dir.substring(0, idx);
-      idx = dir.lastIndexOf('/');
-      if (idx == -1) {
-        throw new IOException("Cannot determine segment part: " + dir);
-      }
-      final String part = dir.substring(idx + 1);
-      // find segment name
-      dir = dir.substring(0, idx);
-      idx = dir.lastIndexOf('/');
-      if (idx == -1) {
-        throw new IOException("Cannot determine segment name: " + dir);
-      }
-      final String segment = dir.substring(idx + 1);
+      final SegmentPart segmentPart = SegmentPart.get(split);
+      final String spString = segmentPart.toString();
 
       return new SequenceFileRecordReader(job, split) {
         public synchronized boolean next(Writable key, Writable value) throws 
IOException {
-          ObjectWritable wrapper = (ObjectWritable) value;
+          MetaWrapper wrapper = (MetaWrapper) value;
           try {
             wrapper.set(getValueClass().newInstance());
           } catch (Exception e) {
             throw new IOException(e.toString());
           }
           boolean res = super.next(key, (Writable) wrapper.get());
-          Object o = wrapper.get();
-          if (o instanceof CrawlDatum) {
-            // record which part of segment this comes from
-            ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new 
Text(part));
-            ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new 
Text(segment));
-          } else if (o instanceof Content) {
-            if (((Content)o).getMetadata() == null) {
-              ((Content)o).setMetadata(new Metadata());
-            }
-            ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), 
segment);
-          } else if (o instanceof ParseData) {
-            if (((ParseData)o).getParseMeta() == null) {
-              ((ParseData)o).setParseMeta(new Metadata());
-            }
-            ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), 
segment);
-          } else if (o instanceof ParseText) {
-            String text = ((ParseText)o).getText();
-            o = new ParseText(SEGMENT_NAME_KEY.toString() +
-                    segment + SEGMENT_NAME_KEY.toString() + text);
-            wrapper.set(o);
-          } else {
-            throw new IOException("Unknown value type: " + 
o.getClass().getName() + "(" + o + ")");
-          }
+          wrapper.setMeta(SEGMENT_PART_KEY, spString);
           return res;
         }
         
         public Writable createValue() {
-          return new ObjectWritable();
+          return new MetaWrapper();
         }
       };
     }
   }
 
-  public static class SegmentOutputFormat extends 
org.apache.hadoop.mapred.OutputFormatBase {
+  public static class SegmentOutputFormat extends OutputFormatBase {
     private static final String DEFAULT_SLICE = "default";
     
     public RecordWriter getRecordWriter(final FileSystem fs, final JobConf 
job, final String name, final Progressable progress) throws IOException {
@@ -187,47 +147,27 @@
         
         public void write(WritableComparable key, Writable value) throws 
IOException {
           // unwrap
-          Writable o = (Writable)((ObjectWritable)value).get();
-          String slice = null;
+          MetaWrapper wrapper = (MetaWrapper)value;
+          SegmentPart sp = 
SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
+          Writable o = (Writable)wrapper.get();
+          String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
           if (o instanceof CrawlDatum) {
-            // check which output dir it should go into
-            Text part = 
(Text)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY);
-            ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY);
-            ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY);
-            if (part == null)
-              throw new IOException("Null segment part, key=" + key);
-            Text uSlice = 
(Text)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY);
-            ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY);
-            if (uSlice != null) slice = uSlice.toString();
-            String partString = part.toString();
-            if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+            if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
               g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
               g_out.append(key, o);
-            } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {
+            } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
               f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, 
CrawlDatum.class);
               f_out.append(key, o);
-            } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {
+            } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
               p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
               p_out.append(key, o);
             } else {
-              throw new IOException("Cannot determine segment part: " + 
partString);
+              throw new IOException("Cannot determine segment part: " + 
sp.partName);
             }
           } else if (o instanceof Content) {
-            slice = ((Content)o).getMetadata().get(sliceMarker);
-            ((Content)o).getMetadata().remove(sliceMarker);
-            ((Content)o).getMetadata().remove(nameMarker);
-            // update the segment name inside metadata
-            if (slice == null) {
-              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName);
-            } else {
-              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName + "-" + slice);
-            }
             c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
             c_out.append(key, o);
           } else if (o instanceof ParseData) {
-            slice = ((ParseData)o).getParseMeta().get(sliceMarker);
-            ((ParseData)o).getParseMeta().remove(sliceMarker);
-            ((ParseData)o).getParseMeta().remove(nameMarker);
             // update the segment name inside contentMeta - required by Indexer
             if (slice == null) {
               ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, 
segmentName);
@@ -237,25 +177,6 @@
             pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
             pd_out.append(key, o);
           } else if (o instanceof ParseText) {
-            String text = ((ParseText)o).getText();
-            if (text != null) {
-              // get slice name, and remove it from the text
-              if (text.startsWith(sliceMarker)) {
-                int idx = text.indexOf(sliceMarker, sliceMarker.length());
-                if (idx != -1) {
-                  slice = text.substring(sliceMarker.length(), idx);
-                  text = text.substring(idx + sliceMarker.length());
-                }
-              }
-              // get segment name, and remove it from the text
-              if (text.startsWith(nameMarker)) {
-                int idx = text.indexOf(nameMarker, nameMarker.length());
-                if (idx != -1) {
-                  text = text.substring(idx + nameMarker.length());
-                }
-              }
-              o = new ParseText(text);
-            }
             pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
             pt_out.append(key, o);
           }
@@ -380,165 +301,153 @@
     String lastPTname = null;
     TreeMap linked = new TreeMap();
     while (values.hasNext()) {
-      ObjectWritable wrapper = (ObjectWritable)values.next();
+      MetaWrapper wrapper = (MetaWrapper)values.next();
       Object o = wrapper.get();
+      String spString = wrapper.getMeta(SEGMENT_PART_KEY);
+      if (spString == null) {
+        throw new IOException("Null segment part, key=" + key);        
+      }
+      SegmentPart sp = SegmentPart.parse(spString);
       if (o instanceof CrawlDatum) {
         CrawlDatum val = (CrawlDatum)o;
         // check which output dir it belongs to
-        Text part = (Text)val.getMetaData().get(SEGMENT_PART_KEY);
-        if (part == null)
-          throw new IOException("Null segment part, key=" + key);
-        Text uName = (Text)val.getMetaData().get(SEGMENT_NAME_KEY);
-        if (uName == null)
-          throw new IOException("Null segment name, key=" + key);
-        String name = uName.toString();
-        String partString = part.toString();
-        if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+        if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
           if (lastG == null) {
             lastG = val;
-            lastGname = name;
+            lastGname = sp.segmentName;
           } else {
             // take newer
-            if (lastGname.compareTo(name) < 0) {
+            if (lastGname.compareTo(sp.segmentName) < 0) {
               lastG = val;
-              lastGname = name;
+              lastGname = sp.segmentName;
             }
           }
-        } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {
+        } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
           if (lastF == null) {
             lastF = val;
-            lastFname = name;
+            lastFname = sp.segmentName;
           } else {
             // take newer
-            if (lastFname.compareTo(name) < 0) {
+            if (lastFname.compareTo(sp.segmentName) < 0) {
               lastF = val;
-              lastFname = name;
+              lastFname = sp.segmentName;
             }
           }
-        } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {
+        } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
           if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
             if (lastSig == null) {
               lastSig = val;
-              lastSigname = name;
+              lastSigname = sp.segmentName;
             } else {
               // take newer
-              if (lastSigname.compareTo(name) < 0) {
+              if (lastSigname.compareTo(sp.segmentName) < 0) {
                 lastSig = val;
-                lastSigname = name;
+                lastSigname = sp.segmentName;
               }
             }
             continue;
           }
           // collect all LINKED values from the latest segment
-          ArrayList segLinked = (ArrayList)linked.get(name);
+          ArrayList segLinked = (ArrayList)linked.get(sp.segmentName);
           if (segLinked == null) {
             segLinked = new ArrayList();
-            linked.put(name, segLinked);
+            linked.put(sp.segmentName, segLinked);
           }
           segLinked.add(val);
         } else {
-          throw new IOException("Cannot determine segment part: " + 
partString);
+          throw new IOException("Cannot determine segment part: " + 
sp.partName);
         }
       } else if (o instanceof Content) {
-        String name = 
((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString());
         if (lastC == null) {
           lastC = (Content)o;
-          lastCname = name;
+          lastCname = sp.segmentName;
         } else {
-          if (lastCname.compareTo(name) < 0) {
+          if (lastCname.compareTo(sp.segmentName) < 0) {
             lastC = (Content)o;
-            lastCname = name;
+            lastCname = sp.segmentName;
           }
         }
       } else if (o instanceof ParseData) {
-        String name = 
((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString());
         if (lastPD == null) {
           lastPD = (ParseData)o;
-          lastPDname = name;
+          lastPDname = sp.segmentName;
         } else {
-          if (lastPDname.compareTo(name) < 0) {
+          if (lastPDname.compareTo(sp.segmentName) < 0) {
             lastPD = (ParseData)o;
-            lastPDname = name;
+            lastPDname = sp.segmentName;
           }
         }
       } else if (o instanceof ParseText) {
-        String text = ((ParseText)o).getText();
-        String name = null;
-        int idx = text.indexOf(nameMarker, nameMarker.length());
-        if (idx != -1) {
-          name = text.substring(nameMarker.length(), idx);
-        } else {
-          throw new IOException("Missing segment name marker in ParseText, key 
" + key + ": " + text);
-        }
         if (lastPT == null) {
           lastPT = (ParseText)o;
-          lastPTname = name;
+          lastPTname = sp.segmentName;
         } else {
-          if (lastPTname.compareTo(name) < 0) {
+          if (lastPTname.compareTo(sp.segmentName) < 0) {
             lastPT = (ParseText)o;
-            lastPTname = name;
+            lastPTname = sp.segmentName;
           }
         }
       }
     }
     curCount++;
-    Text sliceName = null;
-    ObjectWritable wrapper = new ObjectWritable();
+    String sliceName = null;
+    MetaWrapper wrapper = new MetaWrapper();
     if (sliceSize > 0) {
-      sliceName = new Text(String.valueOf(curCount / sliceSize));
+      sliceName = String.valueOf(curCount / sliceSize);
+      wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName);
     }
+    SegmentPart sp = new SegmentPart();
     // now output the latest values
     if (lastG != null) {
-      if (sliceName != null) {
-        lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
-      }
       wrapper.set(lastG);
+      sp.partName = CrawlDatum.GENERATE_DIR_NAME;
+      sp.segmentName = lastGname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (lastF != null) {
-      if (sliceName != null) {
-        lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
-      }
       wrapper.set(lastF);
+      sp.partName = CrawlDatum.FETCH_DIR_NAME;
+      sp.segmentName = lastFname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (lastSig != null) {
-      if (sliceName != null) {
-        lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
-      }
       wrapper.set(lastSig);
+      sp.partName = CrawlDatum.PARSE_DIR_NAME;
+      sp.segmentName = lastSigname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (lastC != null) {
-      if (sliceName != null) {
-        lastC.getMetadata().set(sliceMarker, sliceName.toString());
-      }
       wrapper.set(lastC);
+      sp.partName = Content.DIR_NAME;
+      sp.segmentName = lastCname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (lastPD != null) {
-      if (sliceName != null) {
-        lastPD.getParseMeta().set(sliceMarker, sliceName.toString());
-      }
       wrapper.set(lastPD);
+      sp.partName = ParseData.DIR_NAME;
+      sp.segmentName = lastPDname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (lastPT != null) {
-      if (sliceName != null) {
-        lastPT = new ParseText(sliceMarker + sliceName + sliceMarker
-                + lastPT.getText());
-      }
       wrapper.set(lastPT);
+      sp.partName = ParseText.DIR_NAME;
+      sp.segmentName = lastPTname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       output.collect(key, wrapper);
     }
     if (linked.size() > 0) {
       String name = (String)linked.lastKey();
+      sp.partName = CrawlDatum.PARSE_DIR_NAME;
+      sp.segmentName = name;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
       ArrayList segLinked = (ArrayList)linked.get(name);
       for (int i = 0; i < segLinked.size(); i++) {
         CrawlDatum link = (CrawlDatum)segLinked.get(i);
-        if (sliceName != null) {
-          link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
-        }
         wrapper.set(link);
         output.collect(key, wrapper);
       }
@@ -629,7 +538,7 @@
     job.setReducerClass(SegmentMerger.class);
     job.setOutputPath(out);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(ObjectWritable.class);
+    job.setOutputValueClass(MetaWrapper.class);
     job.setOutputFormat(SegmentOutputFormat.class);
     
     setConf(job);

Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java?view=auto&rev=474939
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java 
(added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java Tue 
Nov 14 11:44:39 2006
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Utility class for handling information about segment parts.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class SegmentPart {
+  /** Name of the segment (just the last path component). */
+  public String segmentName;
+  /** Name of the segment part (ie. one of subdirectories inside a segment). */
+  public String partName;
+  
+  public SegmentPart() {
+    
+  }
+  
+  public SegmentPart(String segmentName, String partName) {
+    this.segmentName = segmentName;
+    this.partName = partName;
+  }
+  
+  /**
+   * Return a String representation of this class, in the form
+   * "segmentName/partName".
+   */
+  public String toString() {
+    return segmentName + "/" + partName;
+  }
+  
+  /**
+   * Create SegmentPart from a FileSplit.
+   * @param split
+   * @return
+   * @throws Exception
+   */
+  public static SegmentPart get(FileSplit split) throws IOException {
+    return get(split.getPath().toString());
+  }
+  
+  /**
+   * Create SegmentPart from a full path of a location inside any segment part.
+   * @param path full path into a segment part (may include "part-xxxxx" 
components)
+   * @return SegmentPart instance describing this part.
+   * @throws IOException if any required path components are missing.
+   */
+  public static SegmentPart get(String path) throws IOException {
+    // find part name
+    String dir = path.replace('\\', '/');
+    int idx = dir.lastIndexOf("/part-");
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment part: " + dir);
+    }
+    dir = dir.substring(0, idx);
+    idx = dir.lastIndexOf('/');
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment part: " + dir);
+    }
+    String part = dir.substring(idx + 1);
+    // find segment name
+    dir = dir.substring(0, idx);
+    idx = dir.lastIndexOf('/');
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment name: " + dir);
+    }
+    String segment = dir.substring(idx + 1);
+    return new SegmentPart(segment, part);
+  }
+  
+  /**
+   * Create SegmentPart from a String in format "segmentName/partName".
+   * @param string input String
+   * @return parsed instance of SegmentPart
+   * @throws IOException if "/" is missing.
+   */
+  public static SegmentPart parse(String string) throws IOException {
+    int idx = string.indexOf('/');
+    if (idx == -1) {
+      throw new IOException("Invalid SegmentPart: '" + string + "'");
+    }
+    String segment = string.substring(0, idx);
+    String part = string.substring(idx + 1);
+    return new SegmentPart(segment, part);
+  }
+}
\ No newline at end of file

Propchange: 
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to