Author: ab
Date: Mon Apr 3 04:19:43 2006
New Revision: 391003
URL: http://svn.apache.org/viewcvs?rev=391003&view=rev
Log:
Add a -topN option to the reader. This collects the indicated number of
top scoring URLs in a CrawlDB in a sorted list. Such a list is useful for
identifying scoring problems (e.g. link spam).
Development of this functionality was supported by Krugle.net. Thank you!
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=391003&r1=391002&r2=391003&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Mon
Apr 3 04:19:43 2006
@@ -19,10 +19,13 @@
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Random;
import java.util.TreeMap;
import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
@@ -40,9 +43,9 @@
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.LogFormatter;
-import org.apache.hadoop.conf.Configuration;
-
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
@@ -135,6 +138,46 @@
public void close() {}
}
+ public static class CrawlDbTopNMapper implements Mapper {
+ private static final FloatWritable fw = new FloatWritable();
+ private float min = 0.0f;
+
+ public void configure(JobConf job) {
+ long lmin = job.getLong("CrawlDbReader.topN.min", 0);
+ if (lmin != 0) {
+ min = (float)lmin / 1000000.0f;
+ }
+ }
+ public void close() {}
+ public void map(WritableComparable key, Writable value, OutputCollector
output, Reporter reporter)
+ throws IOException {
+ CrawlDatum datum = (CrawlDatum)value;
+ if (datum.getScore() < min) return; // don't collect low-scoring records
+ fw.set(-datum.getScore()); // reverse sorting order
+ output.collect(fw, key); // invert mapping: score -> url
+ }
+ }
+
+ public static class CrawlDbTopNReducer implements Reducer {
+ private long topN;
+ private long count = 0L;
+
+ public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
+ while (values.hasNext() && count < topN) {
+ FloatWritable fw = (FloatWritable)key;
+ fw.set(-fw.get());
+ output.collect(fw, (Writable)values.next());
+ count++;
+ }
+ }
+
+ public void configure(JobConf job) {
+ topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks();
+ }
+
+ public void close() {}
+ }
+
public void processStatJob(String crawlDb, Configuration config) throws
IOException {
LOG.info("CrawlDb statistics start: " + crawlDb);
File tmpFolder = new File(crawlDb, "stat_tmp" +
System.currentTimeMillis());
@@ -241,17 +284,73 @@
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
+ LOG.info("CrawlDb dump: done");
+ }
+
+ public void processTopNJob(String crawlDb, long topN, float min, String
output, Configuration config) throws IOException {
+ LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
+ LOG.info("CrawlDb db: " + crawlDb);
+ File outFolder = new File(output);
+ File tempDir =
+ new File(config.get("mapred.temp.dir", ".") +
+ "/readdb-topN-temp-"+
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(config);
+ job.addInputDir(new File(crawlDb, CrawlDatum.DB_DIR_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(CrawlDatum.class);
+ job.setMapperClass(CrawlDbTopNMapper.class);
+ job.setReducerClass(IdentityReducer.class);
+
+ job.setOutputDir(tempDir);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(FloatWritable.class);
+ job.setOutputValueClass(UTF8.class);
+
+ // XXX hmmm, no setFloat() in the API ... :(
+ job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min));
+ JobClient.runJob(job);
+
+ LOG.info("CrawlDb topN: collecting topN scores.");
+ job = new NutchJob(config);
+ job.setLong("CrawlDbReader.topN", topN);
+
+ job.addInputDir(tempDir);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(FloatWritable.class);
+ job.setInputValueClass(UTF8.class);
+ job.setMapperClass(IdentityMapper.class);
+ job.setReducerClass(CrawlDbTopNReducer.class);
+
+ job.setOutputDir(outFolder);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(FloatWritable.class);
+ job.setOutputValueClass(UTF8.class);
+
+ // XXX *sigh* this apparently doesn't work ... :-((
+ job.setNumReduceTasks(1); // create a single file.
+
+ JobClient.runJob(job);
+ FileSystem fs = FileSystem.get(config);
+ fs.delete(tempDir);
+ LOG.info("CrawlDb topN: done");
+
}
public static void main(String[] args) throws IOException {
CrawlDbReader dbr = new CrawlDbReader();
if (args.length < 1) {
- System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump
<out_dir> | -url <url>)");
+ System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump
<out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
System.err.println("\t<crawldb>\tdirectory name where crawldb is
located");
System.err.println("\t-stats\tprint overall statistics to System.out");
System.err.println("\t-dump <out_dir>\tdump the whole db to a text file
in <out_dir>");
System.err.println("\t-url <url>\tprint information on <url> to
System.out");
+ System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn>
urls sorted by score to <out_dir>");
+ System.err.println("\t\t[<min>]\tskip records with scores below this
value.");
+ System.err.println("\t\t\tThis can significantly improve performance.");
return;
}
String param = null;
@@ -266,6 +365,15 @@
} else if (args[i].equals("-url")) {
param = args[++i];
dbr.readUrl(crawlDb, param, conf);
+ } else if (args[i].equals("-topN")) {
+ param = args[++i];
+ long topN = Long.parseLong(param);
+ param = args[++i];
+ float min = 0.0f;
+ if (i < args.length - 1) {
+ min = Float.parseFloat(args[++i]);
+ }
+ dbr.processTopNJob(crawlDb, topN, min, param, conf);
} else {
System.err.println("\nError: wrong argument " + args[i]);
}