Author: dogacan
Date: Wed Jun 27 01:39:22 2007
New Revision: 551098
URL: http://svn.apache.org/viewvc?view=rev&rev=551098
Log:
NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code.
Modified:
lucene/nutch/trunk/CHANGES.txt
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
Modified: lucene/nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Wed Jun 27 01:39:22 2007
@@ -70,6 +70,8 @@
22. NUTCH-434 - Replace usage of ObjectWritable with something based on
GenericWritable. (dogacan)
+23. NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code. (dogacan)
+
Release 0.9 - 2007-04-02
1. Changed log4j confiquration to log to stdout on commandline
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Wed Jun 27
01:39:22 2007
@@ -41,7 +41,7 @@
import org.apache.nutch.util.NutchJob;
/** Maintains an inverted link map, listing incoming links for each url. */
-public class LinkDb extends ToolBase implements Mapper, Reducer {
+public class LinkDb extends ToolBase implements Mapper {
public static final Log LOG = LogFactory.getLog(LinkDb.class);
@@ -49,41 +49,10 @@
public static final String LOCK_NAME = ".locked";
private int maxAnchorLength;
- private int maxInlinks;
private boolean ignoreInternalLinks;
private URLFilters urlFilters;
private URLNormalizers urlNormalizers;
- public static class Merger extends MapReduceBase implements Reducer {
- private int _maxInlinks;
-
- public void configure(JobConf job) {
- super.configure(job);
- _maxInlinks = job.getInt("db.max.inlinks", 10000);
- }
-
- public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
- Inlinks inlinks = null;
- while (values.hasNext()) {
- if (inlinks == null) {
- inlinks = (Inlinks)values.next();
- continue;
- }
- Inlinks val = (Inlinks)values.next();
- for (Iterator it = val.iterator(); it.hasNext(); ) {
- if (inlinks.size() >= _maxInlinks) {
- output.collect(key, inlinks);
- return;
- }
- Inlink in = (Inlink)it.next();
- inlinks.add(in);
- }
- }
- if (inlinks.size() == 0) return;
- output.collect(key, inlinks);
- }
- }
-
public LinkDb() {
}
@@ -94,7 +63,6 @@
public void configure(JobConf job) {
maxAnchorLength = job.getInt("db.max.anchor.length", 100);
- maxInlinks = job.getInt("db.max.inlinks", 10000);
ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true);
if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
urlFilters = new URLFilters(job);
@@ -176,26 +144,6 @@
}
}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
-
- Inlinks result = new Inlinks();
-
- while (values.hasNext()) {
- Inlinks inlinks = (Inlinks)values.next();
-
- int end = Math.min(maxInlinks - result.size(), inlinks.size());
- Iterator it = inlinks.iterator();
- int i = 0;
- while(it.hasNext() && i++ < end) {
- result.add((Inlink)it.next());
- }
- }
- if (result.size() == 0) return;
- output.collect(key, result);
- }
-
public void invert(Path linkDb, final Path segmentsDir, boolean normalize,
boolean filter, boolean force) throws IOException {
final FileSystem fs = FileSystem.get(getConf());
Path[] files = fs.listPaths(segmentsDir, new PathFilter() {
@@ -240,7 +188,7 @@
}
// try to merge
Path newLinkDb = job.getOutputPath();
- job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter);
+ job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
job.addInputPath(currentLinkDb);
job.addInputPath(newLinkDb);
try {
@@ -279,31 +227,7 @@
LOG.warn("LinkDb createJob: " + e);
}
}
- job.setReducerClass(LinkDb.class);
-
- job.setOutputPath(newLinkDb);
- job.setOutputFormat(MapFileOutputFormat.class);
- job.setBoolean("mapred.output.compress", true);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Inlinks.class);
-
- return job;
- }
-
- public static JobConf createMergeJob(Configuration config, Path linkDb,
boolean normalize, boolean filter) {
- Path newLinkDb =
- new Path("linkdb-merge-" +
- Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
- JobConf job = new NutchJob(config);
- job.setJobName("linkdb merge " + linkDb);
-
- job.setInputFormat(SequenceFileInputFormat.class);
-
- job.setMapperClass(LinkDbFilter.class);
- job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
- job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
- job.setReducerClass(Merger.class);
+ job.setReducerClass(LinkDbMerger.class);
job.setOutputPath(newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Wed
Jun 27 01:39:22 2007
@@ -16,18 +16,29 @@
*/
package org.apache.nutch.crawl;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolBase;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
/**
* This tool merges several LinkDb-s into one, optionally filtering
@@ -47,9 +58,11 @@
*
* @author Andrzej Bialecki
*/
-public class LinkDbMerger extends ToolBase {
+public class LinkDbMerger extends ToolBase implements Reducer {
private static final Log LOG = LogFactory.getLog(LinkDbMerger.class);
+ private int maxInlinks;
+
public LinkDbMerger() {
}
@@ -58,8 +71,33 @@
setConf(conf);
}
+ public void reduce(WritableComparable key, Iterator values, OutputCollector
output, Reporter reporter) throws IOException {
+
+ Inlinks result = new Inlinks();
+
+ while (values.hasNext()) {
+ Inlinks inlinks = (Inlinks)values.next();
+
+ int end = Math.min(maxInlinks - result.size(), inlinks.size());
+ Iterator it = inlinks.iterator();
+ int i = 0;
+ while(it.hasNext() && i++ < end) {
+ result.add((Inlink)it.next());
+ }
+ }
+ if (result.size() == 0) return;
+ output.collect(key, result);
+
+ }
+
+ public void configure(JobConf job) {
+ maxInlinks = job.getInt("db.max.inlinks", 10000);
+ }
+
+ public void close() throws IOException { }
+
public void merge(Path output, Path[] dbs, boolean normalize, boolean
filter) throws Exception {
- JobConf job = LinkDb.createMergeJob(getConf(), output, normalize, filter);
+ JobConf job = createMergeJob(getConf(), output, normalize, filter);
for (int i = 0; i < dbs.length; i++) {
job.addInputPath(new Path(dbs[i], LinkDb.CURRENT_NAME));
}
@@ -68,6 +106,31 @@
fs.mkdirs(output);
fs.rename(job.getOutputPath(), new Path(output, LinkDb.CURRENT_NAME));
}
+
+ public static JobConf createMergeJob(Configuration config, Path linkDb,
boolean normalize, boolean filter) {
+ Path newLinkDb =
+ new Path("linkdb-merge-" +
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(config);
+ job.setJobName("linkdb merge " + linkDb);
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(LinkDbFilter.class);
+ job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
+ job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
+ job.setReducerClass(LinkDbMerger.class);
+
+ job.setOutputPath(newLinkDb);
+ job.setOutputFormat(MapFileOutputFormat.class);
+ job.setBoolean("mapred.output.compress", true);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Inlinks.class);
+
+ return job;
+ }
+
/**
* @param args
*/