Author: dogacan
Date: Wed Jun 27 01:39:22 2007
New Revision: 551098

URL: http://svn.apache.org/viewvc?view=rev&rev=551098
Log:
NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code.

Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Wed Jun 27 01:39:22 2007
@@ -70,6 +70,8 @@
 22. NUTCH-434 - Replace usage of ObjectWritable with something based on 
     GenericWritable. (dogacan)
 
+23. NUTCH-499 - Refactor LinkDb and LinkDbMerger to reuse code. (dogacan)
+
 Release 0.9 - 2007-04-02
 
  1. Changed log4j confiquration to log to stdout on commandline

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Wed Jun 27 
01:39:22 2007
@@ -41,7 +41,7 @@
 import org.apache.nutch.util.NutchJob;
 
 /** Maintains an inverted link map, listing incoming links for each url. */
-public class LinkDb extends ToolBase implements Mapper, Reducer {
+public class LinkDb extends ToolBase implements Mapper {
 
   public static final Log LOG = LogFactory.getLog(LinkDb.class);
 
@@ -49,41 +49,10 @@
   public static final String LOCK_NAME = ".locked";
 
   private int maxAnchorLength;
-  private int maxInlinks;
   private boolean ignoreInternalLinks;
   private URLFilters urlFilters;
   private URLNormalizers urlNormalizers;
   
-  public static class Merger extends MapReduceBase implements Reducer {
-    private int _maxInlinks;
-    
-    public void configure(JobConf job) {
-      super.configure(job);
-      _maxInlinks = job.getInt("db.max.inlinks", 10000);
-    }
-
-    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter) throws IOException {
-      Inlinks inlinks = null;
-      while (values.hasNext()) {
-        if (inlinks == null) {
-          inlinks = (Inlinks)values.next();
-          continue;
-        }
-        Inlinks val = (Inlinks)values.next();
-        for (Iterator it = val.iterator(); it.hasNext(); ) {
-          if (inlinks.size() >= _maxInlinks) {
-            output.collect(key, inlinks);
-            return;
-          }
-          Inlink in = (Inlink)it.next();
-          inlinks.add(in);
-        }
-      }
-      if (inlinks.size() == 0) return;
-      output.collect(key, inlinks);
-    }
-  }
-
   public LinkDb() {
     
   }
@@ -94,7 +63,6 @@
   
   public void configure(JobConf job) {
     maxAnchorLength = job.getInt("db.max.anchor.length", 100);
-    maxInlinks = job.getInt("db.max.inlinks", 10000);
     ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true);
     if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
       urlFilters = new URLFilters(job);
@@ -176,26 +144,6 @@
     }
   }
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
-    throws IOException {
-
-    Inlinks result = new Inlinks();
-
-    while (values.hasNext()) {
-      Inlinks inlinks = (Inlinks)values.next();
-
-      int end = Math.min(maxInlinks - result.size(), inlinks.size());
-      Iterator it = inlinks.iterator();
-      int i = 0;
-      while(it.hasNext() && i++ < end) {
-        result.add((Inlink)it.next());
-      }
-    }
-    if (result.size() == 0) return;
-    output.collect(key, result);
-  }
-
   public void invert(Path linkDb, final Path segmentsDir, boolean normalize, 
boolean filter, boolean force) throws IOException {
     final FileSystem fs = FileSystem.get(getConf());
     Path[] files = fs.listPaths(segmentsDir, new PathFilter() {
@@ -240,7 +188,7 @@
       }
       // try to merge
       Path newLinkDb = job.getOutputPath();
-      job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter);
+      job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
       job.addInputPath(currentLinkDb);
       job.addInputPath(newLinkDb);
       try {
@@ -279,31 +227,7 @@
         LOG.warn("LinkDb createJob: " + e);
       }
     }
-    job.setReducerClass(LinkDb.class);
-
-    job.setOutputPath(newLinkDb);
-    job.setOutputFormat(MapFileOutputFormat.class);
-    job.setBoolean("mapred.output.compress", true);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(Inlinks.class);
-
-    return job;
-  }
-
-  public static JobConf createMergeJob(Configuration config, Path linkDb, 
boolean normalize, boolean filter) {
-    Path newLinkDb =
-      new Path("linkdb-merge-" + 
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
-    JobConf job = new NutchJob(config);
-    job.setJobName("linkdb merge " + linkDb);
-
-    job.setInputFormat(SequenceFileInputFormat.class);
-
-    job.setMapperClass(LinkDbFilter.class);
-    job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
-    job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
-    job.setReducerClass(Merger.class);
+    job.setReducerClass(LinkDbMerger.class);
 
     job.setOutputPath(newLinkDb);
     job.setOutputFormat(MapFileOutputFormat.class);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java?view=diff&rev=551098&r1=551097&r2=551098
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Wed 
Jun 27 01:39:22 2007
@@ -16,18 +16,29 @@
  */
 package org.apache.nutch.crawl;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolBase;
 import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
 
 /**
  * This tool merges several LinkDb-s into one, optionally filtering
@@ -47,9 +58,11 @@
  * 
  * @author Andrzej Bialecki
  */
-public class LinkDbMerger extends ToolBase {
+public class LinkDbMerger extends ToolBase implements Reducer {
   private static final Log LOG = LogFactory.getLog(LinkDbMerger.class);
   
+  private int maxInlinks;
+  
   public LinkDbMerger() {
     
   }
@@ -58,8 +71,33 @@
     setConf(conf);
   }
 
+  public void reduce(WritableComparable key, Iterator values, OutputCollector 
output, Reporter reporter) throws IOException {
+
+    Inlinks result = new Inlinks();
+
+    while (values.hasNext()) {
+      Inlinks inlinks = (Inlinks)values.next();
+
+      int end = Math.min(maxInlinks - result.size(), inlinks.size());
+      Iterator it = inlinks.iterator();
+      int i = 0;
+      while(it.hasNext() && i++ < end) {
+        result.add((Inlink)it.next());
+      }
+    }
+    if (result.size() == 0) return;
+    output.collect(key, result);
+    
+  }
+
+  public void configure(JobConf job) {
+    maxInlinks = job.getInt("db.max.inlinks", 10000);
+  }
+
+  public void close() throws IOException { }
+
   public void merge(Path output, Path[] dbs, boolean normalize, boolean 
filter) throws Exception {
-    JobConf job = LinkDb.createMergeJob(getConf(), output, normalize, filter);
+    JobConf job = createMergeJob(getConf(), output, normalize, filter);
     for (int i = 0; i < dbs.length; i++) {
       job.addInputPath(new Path(dbs[i], LinkDb.CURRENT_NAME));      
     }
@@ -68,6 +106,31 @@
     fs.mkdirs(output);
     fs.rename(job.getOutputPath(), new Path(output, LinkDb.CURRENT_NAME));
   }
+
+  public static JobConf createMergeJob(Configuration config, Path linkDb, 
boolean normalize, boolean filter) {
+    Path newLinkDb =
+      new Path("linkdb-merge-" + 
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("linkdb merge " + linkDb);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(LinkDbFilter.class);
+    job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
+    job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
+    job.setReducerClass(LinkDbMerger.class);
+
+    job.setOutputPath(newLinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setBoolean("mapred.output.compress", true);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Inlinks.class);
+
+    return job;
+  }
+  
   /**
    * @param args
    */



-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to