Author: dogacan Date: Wed Jun 27 01:39:22 2007 New Revision: 551098 URL: http://svn.apache.org/viewvc?view=rev&rev=551098 Log: NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code.
Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=551098&r1=551097&r2=551098 ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Wed Jun 27 01:39:22 2007 @@ -70,6 +70,8 @@ 22. NUTCH-434 - Replace usage of ObjectWritable with something based on GenericWritable. (dogacan) +23. NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code. (dogacan) + Release 0.9 - 2007-04-02 1. Changed log4j confiquration to log to stdout on commandline Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?view=diff&rev=551098&r1=551097&r2=551098 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Wed Jun 27 01:39:22 2007 @@ -41,7 +41,7 @@ import org.apache.nutch.util.NutchJob; /** Maintains an inverted link map, listing incoming links for each url. */ -public class LinkDb extends ToolBase implements Mapper, Reducer { +public class LinkDb extends ToolBase implements Mapper { public static final Log LOG = LogFactory.getLog(LinkDb.class); @@ -49,41 +49,10 @@ public static final String LOCK_NAME = ".locked"; private int maxAnchorLength; - private int maxInlinks; private boolean ignoreInternalLinks; private URLFilters urlFilters; private URLNormalizers urlNormalizers; - public static class Merger extends MapReduceBase implements Reducer { - private int _maxInlinks; - - public void configure(JobConf job) { - super.configure(job); - _maxInlinks = job.getInt("db.max.inlinks", 10000); - } - - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { - Inlinks inlinks = null; - while (values.hasNext()) { - if (inlinks == null) { - inlinks = (Inlinks)values.next(); - continue; - } - Inlinks val = (Inlinks)values.next(); - for (Iterator it = val.iterator(); it.hasNext(); ) { - if (inlinks.size() >= _maxInlinks) { - output.collect(key, inlinks); - return; - } - Inlink in = (Inlink)it.next(); - inlinks.add(in); - } - } - if (inlinks.size() == 0) return; - output.collect(key, inlinks); - } - } - public LinkDb() { } @@ -94,7 +63,6 @@ public void configure(JobConf job) { maxAnchorLength = job.getInt("db.max.anchor.length", 100); - maxInlinks = job.getInt("db.max.inlinks", 10000); ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true); if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) { urlFilters = new URLFilters(job); @@ -176,26 +144,6 @@ } } - public void reduce(WritableComparable key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - Inlinks result = new Inlinks(); - - while (values.hasNext()) { - Inlinks inlinks = (Inlinks)values.next(); - - int end = Math.min(maxInlinks - result.size(), inlinks.size()); - Iterator it = inlinks.iterator(); - int i = 0; - while(it.hasNext() && i++ < end) { - result.add((Inlink)it.next()); - } - } - if (result.size() == 0) return; - output.collect(key, result); - } - public void invert(Path linkDb, final Path segmentsDir, boolean normalize, boolean filter, boolean force) throws IOException { final FileSystem fs = FileSystem.get(getConf()); Path[] files = fs.listPaths(segmentsDir, new PathFilter() { @@ -240,7 +188,7 @@ } // try to merge Path newLinkDb = job.getOutputPath(); - job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter); + job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter); job.addInputPath(currentLinkDb); job.addInputPath(newLinkDb); try { @@ -279,31 +227,7 @@ LOG.warn("LinkDb createJob: " + e); } } - job.setReducerClass(LinkDb.class); - - job.setOutputPath(newLinkDb); - job.setOutputFormat(MapFileOutputFormat.class); - job.setBoolean("mapred.output.compress", true); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Inlinks.class); - - return job; - } - - public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) { - Path newLinkDb = - new Path("linkdb-merge-" + - Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - - JobConf job = new NutchJob(config); - job.setJobName("linkdb merge " + linkDb); - - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(LinkDbFilter.class); - job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); - job.setBoolean(LinkDbFilter.URL_FILTERING, filter); - job.setReducerClass(Merger.class); + job.setReducerClass(LinkDbMerger.class); job.setOutputPath(newLinkDb); job.setOutputFormat(MapFileOutputFormat.class); Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java?view=diff&rev=551098&r1=551097&r2=551098 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Wed Jun 27 01:39:22 2007 @@ -16,18 +16,29 @@ */ package org.apache.nutch.crawl; +import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolBase; import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; /** * This tool merges several LinkDb-s into one, optionally filtering @@ -47,9 +58,11 @@ * * @author Andrzej Bialecki */ -public class LinkDbMerger extends ToolBase { +public class LinkDbMerger extends ToolBase implements Reducer { private static final Log LOG = LogFactory.getLog(LinkDbMerger.class); + private int maxInlinks; + public LinkDbMerger() { } @@ -58,8 +71,33 @@ setConf(conf); } + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + + Inlinks result = new Inlinks(); + + while (values.hasNext()) { + Inlinks inlinks = (Inlinks)values.next(); + + int end = Math.min(maxInlinks - result.size(), inlinks.size()); + Iterator it = inlinks.iterator(); + int i = 0; + while(it.hasNext() && i++ < end) { + result.add((Inlink)it.next()); + } + } + if (result.size() == 0) return; + output.collect(key, result); + + } + + public void configure(JobConf job) { + maxInlinks = job.getInt("db.max.inlinks", 10000); + } + + public void close() throws IOException { } + public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) throws Exception { - JobConf job = LinkDb.createMergeJob(getConf(), output, normalize, filter); + JobConf job = createMergeJob(getConf(), output, normalize, filter); for (int i = 0; i < dbs.length; i++) { job.addInputPath(new Path(dbs[i], LinkDb.CURRENT_NAME)); } @@ -68,6 +106,31 @@ fs.mkdirs(output); fs.rename(job.getOutputPath(), new Path(output, LinkDb.CURRENT_NAME)); } + + public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) { + Path newLinkDb = + new Path("linkdb-merge-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("linkdb merge " + linkDb); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(LinkDbFilter.class); + job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); + job.setBoolean(LinkDbFilter.URL_FILTERING, filter); + job.setReducerClass(LinkDbMerger.class); + + job.setOutputPath(newLinkDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", true); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Inlinks.class); + + return job; + } + /** * @param args */ ------------------------------------------------------------------------- This SF.net email is sponsored by DB2 Express Download DB2 Express C - the FREE version of DB2 express and take control of your XML. No limits. Just data. Click to get it now. http://sourceforge.net/powerbar/db2/ _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs