Author: ab
Date: Mon Apr  3 04:19:43 2006
New Revision: 391003

URL: http://svn.apache.org/viewcvs?rev=391003&view=rev
Log:
Add a -topN option to the reader. This collects the indicated number of
top scoring URLs in a CrawlDB in a sorted list. Such a list is useful for
identifying scoring problems (e.g. link spam).

Development of this functionality was supported by Krugle.net. Thank you!

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=391003&r1=391002&r2=391003&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Mon 
Apr  3 04:19:43 2006
@@ -19,10 +19,13 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.logging.Logger;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
@@ -40,9 +43,9 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.LogFormatter;
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -135,6 +138,46 @@
     public void close() {}
   }
   
+  public static class CrawlDbTopNMapper implements Mapper {
+    private static final FloatWritable fw = new FloatWritable();
+    private float min = 0.0f;
+    
+    public void configure(JobConf job) {
+      long lmin = job.getLong("CrawlDbReader.topN.min", 0);
+      if (lmin != 0) {
+        min = (float)lmin / 1000000.0f;
+      }
+    }
+    public void close() {}
+    public void map(WritableComparable key, Writable value, OutputCollector 
output, Reporter reporter)
+            throws IOException {
+      CrawlDatum datum = (CrawlDatum)value;
+      if (datum.getScore() < min) return; // don't collect low-scoring records
+      fw.set(-datum.getScore()); // reverse sorting order
+      output.collect(fw, key); // invert mapping: score -> url
+    }
+  }
+  
+  public static class CrawlDbTopNReducer implements Reducer {
+    private long topN;
+    private long count = 0L;
+    
+    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter) throws IOException {
+      while (values.hasNext() && count < topN) {
+        FloatWritable fw = (FloatWritable)key;
+        fw.set(-fw.get());
+        output.collect(fw, (Writable)values.next());
+        count++;
+      }
+    }
+
+    public void configure(JobConf job) {
+      topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks();
+    }
+    
+    public void close() {}
+  }
+  
   public void processStatJob(String crawlDb, Configuration config) throws 
IOException {
     LOG.info("CrawlDb statistics start: " + crawlDb);
     File tmpFolder = new File(crawlDb, "stat_tmp" + 
System.currentTimeMillis());
@@ -241,17 +284,73 @@
     job.setOutputValueClass(CrawlDatum.class);
 
     JobClient.runJob(job);
+    LOG.info("CrawlDb dump: done");
+  }
+
+  public void processTopNJob(String crawlDb, long topN, float min, String 
output, Configuration config) throws IOException {
+    LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
+    LOG.info("CrawlDb db: " + crawlDb);
+    File outFolder = new File(output);
+    File tempDir =
+      new File(config.get("mapred.temp.dir", ".") +
+               "/readdb-topN-temp-"+
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.addInputDir(new File(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(CrawlDatum.class);
+    job.setMapperClass(CrawlDbTopNMapper.class);
+    job.setReducerClass(IdentityReducer.class);
+
+    job.setOutputDir(tempDir);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(UTF8.class);
+
+    // XXX hmmm, no setFloat() in the API ... :(
+    job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min));
+    JobClient.runJob(job); 
+    
+    LOG.info("CrawlDb topN: collecting topN scores.");
+    job = new NutchJob(config);
+    job.setLong("CrawlDbReader.topN", topN);
+
+    job.addInputDir(tempDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(FloatWritable.class);
+    job.setInputValueClass(UTF8.class);
+    job.setMapperClass(IdentityMapper.class);
+    job.setReducerClass(CrawlDbTopNReducer.class);
+
+    job.setOutputDir(outFolder);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(UTF8.class);
+
+    // XXX *sigh* this apparently doesn't work ... :-((
+    job.setNumReduceTasks(1); // create a single file.
+    
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(config);
+    fs.delete(tempDir);
+    LOG.info("CrawlDb topN: done");
+
   }
 
   public static void main(String[] args) throws IOException {
     CrawlDbReader dbr = new CrawlDbReader();
 
     if (args.length < 1) {
-      System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump 
<out_dir> | -url <url>)");
+      System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump 
<out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
       System.err.println("\t<crawldb>\tdirectory name where crawldb is 
located");
       System.err.println("\t-stats\tprint overall statistics to System.out");
       System.err.println("\t-dump <out_dir>\tdump the whole db to a text file 
in <out_dir>");
       System.err.println("\t-url <url>\tprint information on <url> to 
System.out");
+      System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> 
urls sorted by score to <out_dir>");
+      System.err.println("\t\t[<min>]\tskip records with scores below this 
value.");
+      System.err.println("\t\t\tThis can significantly improve performance.");
       return;
     }
     String param = null;
@@ -266,6 +365,15 @@
       } else if (args[i].equals("-url")) {
         param = args[++i];
         dbr.readUrl(crawlDb, param, conf);
+      } else if (args[i].equals("-topN")) {
+        param = args[++i];
+        long topN = Long.parseLong(param);
+        param = args[++i];
+        float min = 0.0f;
+        if (i < args.length - 1) {
+          min = Float.parseFloat(args[++i]);
+        }
+        dbr.processTopNJob(crawlDb, topN, min, param, conf);
       } else {
         System.err.println("\nError: wrong argument " + args[i]);
       }


Reply via email to