Author: ab Date: Tue Nov 14 11:44:39 2006 New Revision: 474939 URL: http://svn.apache.org/viewvc?view=rev&rev=474939 Log: Refactor SegmentMerger to use MetaWrapper. This significantly simplifies the code, and avoids crude hacks to pass around segment-related metadata.
Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java (with props) Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=474939&r1=474938&r2=474939 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue Nov 14 11:44:39 2006 @@ -33,6 +33,7 @@ import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Generator; import org.apache.nutch.fetcher.Fetcher; +import org.apache.nutch.metadata.MetaWrapper; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.net.URLFilters; import org.apache.nutch.parse.ParseData; @@ -90,19 +91,16 @@ public class SegmentMerger extends Configured implements Mapper, Reducer { private static final Log LOG = LogFactory.getLog(SegmentMerger.class); - private static final Text SEGMENT_PART_KEY = new Text("_PaRt_"); - private static final Text SEGMENT_NAME_KEY = new Text("_NaMe_"); - private static final String nameMarker = SEGMENT_NAME_KEY.toString(); - private static final Text SEGMENT_SLICE_KEY = new Text("_SlIcE_"); - private static final String sliceMarker = SEGMENT_SLICE_KEY.toString(); + private static final String SEGMENT_PART_KEY = "part"; + private static final String SEGMENT_SLICE_KEY = "slice"; private URLFilters filters = null; private long sliceSize = -1; private long curCount = 0; /** - * Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging different - * types in reduce. + * Wraps inputs in an [EMAIL PROTECTED] MetaWrapper}, to permit merging different + * types in reduce and use additional metadata. */ public static class ObjectInputFormat extends SequenceFileInputFormat { public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) @@ -110,68 +108,30 @@ reporter.setStatus(split.toString()); // find part name - String dir = split.getPath().toString().replace('\\', '/'); - int idx = dir.lastIndexOf("/part-"); - if (idx == -1) { - throw new IOException("Cannot determine segment part: " + dir); - } - dir = dir.substring(0, idx); - idx = dir.lastIndexOf('/'); - if (idx == -1) { - throw new IOException("Cannot determine segment part: " + dir); - } - final String part = dir.substring(idx + 1); - // find segment name - dir = dir.substring(0, idx); - idx = dir.lastIndexOf('/'); - if (idx == -1) { - throw new IOException("Cannot determine segment name: " + dir); - } - final String segment = dir.substring(idx + 1); + final SegmentPart segmentPart = SegmentPart.get(split); + final String spString = segmentPart.toString(); return new SequenceFileRecordReader(job, split) { public synchronized boolean next(Writable key, Writable value) throws IOException { - ObjectWritable wrapper = (ObjectWritable) value; + MetaWrapper wrapper = (MetaWrapper) value; try { wrapper.set(getValueClass().newInstance()); } catch (Exception e) { throw new IOException(e.toString()); } boolean res = super.next(key, (Writable) wrapper.get()); - Object o = wrapper.get(); - if (o instanceof CrawlDatum) { - // record which part of segment this comes from - ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new Text(part)); - ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new Text(segment)); - } else if (o instanceof Content) { - if (((Content)o).getMetadata() == null) { - ((Content)o).setMetadata(new Metadata()); - } - ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), segment); - } else if (o instanceof ParseData) { - if (((ParseData)o).getParseMeta() == null) { - ((ParseData)o).setParseMeta(new Metadata()); - } - ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), segment); - } else if (o instanceof ParseText) { - String text = ((ParseText)o).getText(); - o = new ParseText(SEGMENT_NAME_KEY.toString() + - segment + SEGMENT_NAME_KEY.toString() + text); - wrapper.set(o); - } else { - throw new IOException("Unknown value type: " + o.getClass().getName() + "(" + o + ")"); - } + wrapper.setMeta(SEGMENT_PART_KEY, spString); return res; } public Writable createValue() { - return new ObjectWritable(); + return new MetaWrapper(); } }; } } - public static class SegmentOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase { + public static class SegmentOutputFormat extends OutputFormatBase { private static final String DEFAULT_SLICE = "default"; public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { @@ -187,47 +147,27 @@ public void write(WritableComparable key, Writable value) throws IOException { // unwrap - Writable o = (Writable)((ObjectWritable)value).get(); - String slice = null; + MetaWrapper wrapper = (MetaWrapper)value; + SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY)); + Writable o = (Writable)wrapper.get(); + String slice = wrapper.getMeta(SEGMENT_SLICE_KEY); if (o instanceof CrawlDatum) { - // check which output dir it should go into - Text part = (Text)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY); - ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY); - ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY); - if (part == null) - throw new IOException("Null segment part, key=" + key); - Text uSlice = (Text)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY); - ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY); - if (uSlice != null) slice = uSlice.toString(); - String partString = part.toString(); - if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { + if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) { g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME); g_out.append(key, o); - } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { + } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) { f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, CrawlDatum.class); f_out.append(key, o); - } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { + } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) { p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME); p_out.append(key, o); } else { - throw new IOException("Cannot determine segment part: " + partString); + throw new IOException("Cannot determine segment part: " + sp.partName); } } else if (o instanceof Content) { - slice = ((Content)o).getMetadata().get(sliceMarker); - ((Content)o).getMetadata().remove(sliceMarker); - ((Content)o).getMetadata().remove(nameMarker); - // update the segment name inside metadata - if (slice == null) { - ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName); - } else { - ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice); - } c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); c_out.append(key, o); } else if (o instanceof ParseData) { - slice = ((ParseData)o).getParseMeta().get(sliceMarker); - ((ParseData)o).getParseMeta().remove(sliceMarker); - ((ParseData)o).getParseMeta().remove(nameMarker); // update the segment name inside contentMeta - required by Indexer if (slice == null) { ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName); @@ -237,25 +177,6 @@ pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); pd_out.append(key, o); } else if (o instanceof ParseText) { - String text = ((ParseText)o).getText(); - if (text != null) { - // get slice name, and remove it from the text - if (text.startsWith(sliceMarker)) { - int idx = text.indexOf(sliceMarker, sliceMarker.length()); - if (idx != -1) { - slice = text.substring(sliceMarker.length(), idx); - text = text.substring(idx + sliceMarker.length()); - } - } - // get segment name, and remove it from the text - if (text.startsWith(nameMarker)) { - int idx = text.indexOf(nameMarker, nameMarker.length()); - if (idx != -1) { - text = text.substring(idx + nameMarker.length()); - } - } - o = new ParseText(text); - } pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class); pt_out.append(key, o); } @@ -380,165 +301,153 @@ String lastPTname = null; TreeMap linked = new TreeMap(); while (values.hasNext()) { - ObjectWritable wrapper = (ObjectWritable)values.next(); + MetaWrapper wrapper = (MetaWrapper)values.next(); Object o = wrapper.get(); + String spString = wrapper.getMeta(SEGMENT_PART_KEY); + if (spString == null) { + throw new IOException("Null segment part, key=" + key); + } + SegmentPart sp = SegmentPart.parse(spString); if (o instanceof CrawlDatum) { CrawlDatum val = (CrawlDatum)o; // check which output dir it belongs to - Text part = (Text)val.getMetaData().get(SEGMENT_PART_KEY); - if (part == null) - throw new IOException("Null segment part, key=" + key); - Text uName = (Text)val.getMetaData().get(SEGMENT_NAME_KEY); - if (uName == null) - throw new IOException("Null segment name, key=" + key); - String name = uName.toString(); - String partString = part.toString(); - if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { + if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) { if (lastG == null) { lastG = val; - lastGname = name; + lastGname = sp.segmentName; } else { // take newer - if (lastGname.compareTo(name) < 0) { + if (lastGname.compareTo(sp.segmentName) < 0) { lastG = val; - lastGname = name; + lastGname = sp.segmentName; } } - } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { + } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) { if (lastF == null) { lastF = val; - lastFname = name; + lastFname = sp.segmentName; } else { // take newer - if (lastFname.compareTo(name) < 0) { + if (lastFname.compareTo(sp.segmentName) < 0) { lastF = val; - lastFname = name; + lastFname = sp.segmentName; } } - } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { + } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) { if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) { if (lastSig == null) { lastSig = val; - lastSigname = name; + lastSigname = sp.segmentName; } else { // take newer - if (lastSigname.compareTo(name) < 0) { + if (lastSigname.compareTo(sp.segmentName) < 0) { lastSig = val; - lastSigname = name; + lastSigname = sp.segmentName; } } continue; } // collect all LINKED values from the latest segment - ArrayList segLinked = (ArrayList)linked.get(name); + ArrayList segLinked = (ArrayList)linked.get(sp.segmentName); if (segLinked == null) { segLinked = new ArrayList(); - linked.put(name, segLinked); + linked.put(sp.segmentName, segLinked); } segLinked.add(val); } else { - throw new IOException("Cannot determine segment part: " + partString); + throw new IOException("Cannot determine segment part: " + sp.partName); } } else if (o instanceof Content) { - String name = ((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString()); if (lastC == null) { lastC = (Content)o; - lastCname = name; + lastCname = sp.segmentName; } else { - if (lastCname.compareTo(name) < 0) { + if (lastCname.compareTo(sp.segmentName) < 0) { lastC = (Content)o; - lastCname = name; + lastCname = sp.segmentName; } } } else if (o instanceof ParseData) { - String name = ((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString()); if (lastPD == null) { lastPD = (ParseData)o; - lastPDname = name; + lastPDname = sp.segmentName; } else { - if (lastPDname.compareTo(name) < 0) { + if (lastPDname.compareTo(sp.segmentName) < 0) { lastPD = (ParseData)o; - lastPDname = name; + lastPDname = sp.segmentName; } } } else if (o instanceof ParseText) { - String text = ((ParseText)o).getText(); - String name = null; - int idx = text.indexOf(nameMarker, nameMarker.length()); - if (idx != -1) { - name = text.substring(nameMarker.length(), idx); - } else { - throw new IOException("Missing segment name marker in ParseText, key " + key + ": " + text); - } if (lastPT == null) { lastPT = (ParseText)o; - lastPTname = name; + lastPTname = sp.segmentName; } else { - if (lastPTname.compareTo(name) < 0) { + if (lastPTname.compareTo(sp.segmentName) < 0) { lastPT = (ParseText)o; - lastPTname = name; + lastPTname = sp.segmentName; } } } } curCount++; - Text sliceName = null; - ObjectWritable wrapper = new ObjectWritable(); + String sliceName = null; + MetaWrapper wrapper = new MetaWrapper(); if (sliceSize > 0) { - sliceName = new Text(String.valueOf(curCount / sliceSize)); + sliceName = String.valueOf(curCount / sliceSize); + wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName); } + SegmentPart sp = new SegmentPart(); // now output the latest values if (lastG != null) { - if (sliceName != null) { - lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); - } wrapper.set(lastG); + sp.partName = CrawlDatum.GENERATE_DIR_NAME; + sp.segmentName = lastGname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (lastF != null) { - if (sliceName != null) { - lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); - } wrapper.set(lastF); + sp.partName = CrawlDatum.FETCH_DIR_NAME; + sp.segmentName = lastFname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (lastSig != null) { - if (sliceName != null) { - lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); - } wrapper.set(lastSig); + sp.partName = CrawlDatum.PARSE_DIR_NAME; + sp.segmentName = lastSigname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (lastC != null) { - if (sliceName != null) { - lastC.getMetadata().set(sliceMarker, sliceName.toString()); - } wrapper.set(lastC); + sp.partName = Content.DIR_NAME; + sp.segmentName = lastCname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (lastPD != null) { - if (sliceName != null) { - lastPD.getParseMeta().set(sliceMarker, sliceName.toString()); - } wrapper.set(lastPD); + sp.partName = ParseData.DIR_NAME; + sp.segmentName = lastPDname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (lastPT != null) { - if (sliceName != null) { - lastPT = new ParseText(sliceMarker + sliceName + sliceMarker - + lastPT.getText()); - } wrapper.set(lastPT); + sp.partName = ParseText.DIR_NAME; + sp.segmentName = lastPTname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); output.collect(key, wrapper); } if (linked.size() > 0) { String name = (String)linked.lastKey(); + sp.partName = CrawlDatum.PARSE_DIR_NAME; + sp.segmentName = name; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); ArrayList segLinked = (ArrayList)linked.get(name); for (int i = 0; i < segLinked.size(); i++) { CrawlDatum link = (CrawlDatum)segLinked.get(i); - if (sliceName != null) { - link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); - } wrapper.set(link); output.collect(key, wrapper); } @@ -629,7 +538,7 @@ job.setReducerClass(SegmentMerger.class); job.setOutputPath(out); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(ObjectWritable.class); + job.setOutputValueClass(MetaWrapper.class); job.setOutputFormat(SegmentOutputFormat.class); setConf(job); Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java?view=auto&rev=474939 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java Tue Nov 14 11:44:39 2006 @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.segment; + +import java.io.IOException; + +import org.apache.hadoop.mapred.FileSplit; + +/** + * Utility class for handling information about segment parts. + * + * @author Andrzej Bialecki + */ +public class SegmentPart { + /** Name of the segment (just the last path component). */ + public String segmentName; + /** Name of the segment part (ie. one of subdirectories inside a segment). */ + public String partName; + + public SegmentPart() { + + } + + public SegmentPart(String segmentName, String partName) { + this.segmentName = segmentName; + this.partName = partName; + } + + /** + * Return a String representation of this class, in the form + * "segmentName/partName". + */ + public String toString() { + return segmentName + "/" + partName; + } + + /** + * Create SegmentPart from a FileSplit. + * @param split + * @return + * @throws Exception + */ + public static SegmentPart get(FileSplit split) throws IOException { + return get(split.getPath().toString()); + } + + /** + * Create SegmentPart from a full path of a location inside any segment part. + * @param path full path into a segment part (may include "part-xxxxx" components) + * @return SegmentPart instance describing this part. + * @throws IOException if any required path components are missing. + */ + public static SegmentPart get(String path) throws IOException { + // find part name + String dir = path.replace('\\', '/'); + int idx = dir.lastIndexOf("/part-"); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + String part = dir.substring(idx + 1); + // find segment name + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment name: " + dir); + } + String segment = dir.substring(idx + 1); + return new SegmentPart(segment, part); + } + + /** + * Create SegmentPart from a String in format "segmentName/partName". + * @param string input String + * @return parsed instance of SegmentPart + * @throws IOException if "/" is missing. + */ + public static SegmentPart parse(String string) throws IOException { + int idx = string.indexOf('/'); + if (idx == -1) { + throw new IOException("Invalid SegmentPart: '" + string + "'"); + } + String segment = string.substring(0, idx); + String part = string.substring(idx + 1); + return new SegmentPart(segment, part); + } +} \ No newline at end of file Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java ------------------------------------------------------------------------------ svn:eol-style = native