Author: ab
Date: Tue Jan 15 14:02:52 2008
New Revision: 612245

URL: http://svn.apache.org/viewvc?rev=612245&view=rev
Log:
CrawlDbReader: add some new stats + dump into a csv format

Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=612245&r1=612244&r2=612245&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Jan 15 14:02:52 2008
@@ -184,6 +184,9 @@
 
 63. NUTCH-534 - SegmentMerger: add -normalize option (Emmanuel Joke via ab)
 
+64. NUTCH-528 - CrawlDbReader: add some new stats + dump into a CSV format
+    (Emmanuel Joke via ab)
+
 
 Release 0.9 - 2007-04-02
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=612245&r1=612244&r2=612245&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Tue 
Jan 15 14:02:52 2008
@@ -17,7 +17,10 @@
 
 package org.apache.nutch.crawl;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.URL;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
@@ -36,13 +39,13 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapFileOutputFormat;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
@@ -51,8 +54,10 @@
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
 
 /**
  * Read utility for the CrawlDB.
@@ -64,6 +69,9 @@
 
   public static final Log LOG = LogFactory.getLog(CrawlDbReader.class);
   
+  public static final int STD_FORMAT = 0;
+  public static final int CSV_FORMAT = 1;
+    
   private MapFile.Reader[] readers = null;
   
   private void openReaders(String crawlDb, Configuration config) throws 
IOException {
@@ -82,28 +90,88 @@
       }
     }
   }
+  
+  public static class CrawlDatumCsvOutputFormat extends 
OutputFormatBase<Text,CrawlDatum> {
+    protected static class LineRecordWriter implements 
RecordWriter<Text,CrawlDatum> {
+      private DataOutputStream out;
+
+      public LineRecordWriter(DataOutputStream out) {
+        this.out = out;
+        try {
+          out.writeBytes("Url;Status code;Status name;Fetch Time;Modified 
Time;Retries since fetch;Retry interval;Score;Signature;Metadata\n");
+        } catch (IOException e) {}
+      }
+
+      public synchronized void write(Text key, CrawlDatum value) throws 
IOException {
+          out.writeByte('"');
+          out.writeBytes(key.toString());
+          out.writeByte('"');
+          out.writeByte(';');
+          out.writeBytes(Integer.toString(value.getStatus()));
+          out.writeByte(';');
+          out.writeByte('"');
+          out.writeBytes(CrawlDatum.getStatusName(value.getStatus()));
+          out.writeByte('"');
+          out.writeByte(';');
+          out.writeBytes(new Date(value.getFetchTime()).toString());
+          out.writeByte(';');
+          out.writeBytes(new Date(value.getModifiedTime()).toString());
+          out.writeByte(';');
+          out.writeBytes(Integer.toString(value.getRetriesSinceFetch()));
+          out.writeByte(';');
+          out.writeBytes(Float.toString(value.getFetchInterval()));
+          out.writeByte(';');
+          out.writeBytes(Float.toString((value.getFetchInterval() / 
FetchSchedule.SECONDS_PER_DAY)));
+          out.writeByte(';');
+          out.writeBytes(Float.toString(value.getScore()));
+          out.writeByte(';');
+          out.writeByte('"');
+          out.writeBytes(value.getSignature() != null ? 
StringUtil.toHexString(value.getSignature()): "null");
+          out.writeByte('"');
+          out.writeByte('\n');
+      }
+
+      public synchronized void close(Reporter reporter) throws IOException {
+        out.close();
+      }
+    }
+
+    public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, 
JobConf job, String name,
+        Progressable progress) throws IOException {
+      Path dir = job.getOutputPath();
+      DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+      return new LineRecordWriter(fileOut);
+   }
+  }
 
-  public static class CrawlDbStatMapper implements Mapper {
+  public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, 
Text, LongWritable> {
     LongWritable COUNT_1 = new LongWritable(1);
-    public void configure(JobConf job) {}
+    private boolean sort = false;
+    public void configure(JobConf job) {
+      sort = job.getBoolean("db.reader.stats.sort", false );
+    }
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector 
output, Reporter reporter)
+    public void map(Text key, CrawlDatum value, OutputCollector<Text, 
LongWritable> output, Reporter reporter)
             throws IOException {
-      CrawlDatum cd = (CrawlDatum) value;
       output.collect(new Text("T"), COUNT_1);
-      output.collect(new Text("status " + cd.getStatus()), COUNT_1);
-      output.collect(new Text("retry " + cd.getRetriesSinceFetch()), COUNT_1);
-      output.collect(new Text("s"), new LongWritable((long) (cd.getScore() * 
1000.0)));
+      output.collect(new Text("status " + value.getStatus()), COUNT_1);
+      output.collect(new Text("retry " + value.getRetriesSinceFetch()), 
COUNT_1);
+      output.collect(new Text("s"), new LongWritable((long) (value.getScore() 
* 1000.0)));
+      if(sort){
+        URL u = new URL(key.toString());
+        String host = u.getHost();
+        output.collect(new Text("status " + value.getStatus() + " " + host), 
COUNT_1);
+      }
     }
   }
   
-  public static class CrawlDbStatCombiner implements Reducer {
+  public static class CrawlDbStatCombiner implements Reducer<Text, 
LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
     
     public CrawlDbStatCombiner() { }
     public void configure(JobConf job) { }
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, 
OutputCollector<Text, LongWritable> output, Reporter reporter)
         throws IOException {
       val.set(0L);
       String k = ((Text)key).toString();
@@ -130,10 +198,10 @@
     }
   }
 
-  public static class CrawlDbStatReducer implements Reducer {
+  public static class CrawlDbStatReducer implements Reducer<Text, 
LongWritable, Text, LongWritable> {
     public void configure(JobConf job) {}
     public void close() {}
-    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter)
+    public void reduce(Text key, Iterator<LongWritable> values, 
OutputCollector<Text, LongWritable> output, Reporter reporter)
             throws IOException {
 
       String k = ((Text) key).toString();
@@ -177,53 +245,39 @@
     }
   }
 
-  public static class CrawlDbDumpReducer implements Reducer {
-
-    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter) throws IOException {
-      while (values.hasNext()) {
-        output.collect(key, (Writable)values.next());
-      }
-    }
-
-    public void configure(JobConf job) {}
-    public void close() {}
-  }
-  
-  public static class CrawlDbTopNMapper implements Mapper {
+  public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, 
FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
     
     public void configure(JobConf job) {
-      long lmin = job.getLong("CrawlDbReader.topN.min", 0);
+      long lmin = job.getLong("db.reader.topn.min", 0);
       if (lmin != 0) {
         min = (float)lmin / 1000000.0f;
       }
     }
     public void close() {}
-    public void map(WritableComparable key, Writable value, OutputCollector 
output, Reporter reporter)
+    public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, 
Text> output, Reporter reporter)
             throws IOException {
-      CrawlDatum datum = (CrawlDatum)value;
-      if (datum.getScore() < min) return; // don't collect low-scoring records
-      fw.set(-datum.getScore()); // reverse sorting order
+      if (value.getScore() < min) return; // don't collect low-scoring records
+      fw.set(-value.getScore()); // reverse sorting order
       output.collect(fw, key); // invert mapping: score -> url
     }
   }
   
-  public static class CrawlDbTopNReducer implements Reducer {
+  public static class CrawlDbTopNReducer implements Reducer<FloatWritable, 
Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
     
-    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter) throws IOException {
+    public void reduce(FloatWritable key, Iterator<Text> values, 
OutputCollector<FloatWritable, Text> output, Reporter reporter) throws 
IOException {
       while (values.hasNext() && count < topN) {
-        FloatWritable fw = (FloatWritable)key;
-        fw.set(-fw.get());
-        output.collect(fw, (Writable)values.next());
+        key.set(-key.get());
+        output.collect(key, values.next());
         count++;
       }
     }
 
     public void configure(JobConf job) {
-      topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks();
+      topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks();
     }
     
     public void close() {}
@@ -233,7 +287,7 @@
     closeReaders();
   }
   
-  public void processStatJob(String crawlDb, Configuration config) throws 
IOException {
+  public void processStatJob(String crawlDb, Configuration config, boolean 
sort) throws IOException {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb statistics start: " + crawlDb);
@@ -243,6 +297,7 @@
 
     JobConf job = new NutchJob(config);
     job.setJobName("stats " + crawlDb);
+    job.setBoolean("db.reader.stats.sort", sort);
 
     job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
@@ -303,8 +358,10 @@
         } else if (k.equals("sct")) {
           LOG.info("avg score:\t" + (float) ((((double)val.get()) / 
totalCnt.get()) / 1000.0));
         } else if (k.startsWith("status")) {
-          int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1));
-          LOG.info(k + " (" + CrawlDatum.getStatusName((byte)code) + "):\t" + 
val);
+          String[] st = k.split(" ");
+          int code = Integer.parseInt(st[1]);
+          if(st.length >2 ) LOG.info("   " + st[2] +" :\t" + val);
+          else LOG.info(st[0] +" " +code + " (" + 
CrawlDatum.getStatusName((byte) code) + "):\t" + val);
         } else LOG.info(k + ":\t" + val);
       }
     }
@@ -332,7 +389,7 @@
     }
   }
   
-  public void processDumpJob(String crawlDb, String output, Configuration 
config) throws IOException {
+  public void processDumpJob(String crawlDb, String output, Configuration 
config, int format) throws IOException {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb dump: starting");
@@ -348,7 +405,8 @@
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setOutputPath(outFolder);
-    job.setOutputFormat(TextOutputFormat.class);
+    if(format == CSV_FORMAT) 
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
+    else job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
 
@@ -382,7 +440,7 @@
     job.setOutputValueClass(Text.class);
 
     // XXX hmmm, no setFloat() in the API ... :(
-    job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min));
+    job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
     JobClient.runJob(job); 
     
     if (LOG.isInfoEnabled()) {
@@ -390,7 +448,7 @@
     }
     job = new NutchJob(config);
     job.setJobName("topN collect " + crawlDb);
-    job.setLong("CrawlDbReader.topN", topN);
+    job.setLong("db.reader.topn", topN);
 
     job.addInputPath(tempDir);
     job.setInputFormat(SequenceFileInputFormat.class);
@@ -418,8 +476,11 @@
     if (args.length < 1) {
       System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump 
<out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
       System.err.println("\t<crawldb>\tdirectory name where crawldb is 
located");
-      System.err.println("\t-stats\tprint overall statistics to System.out");
-      System.err.println("\t-dump <out_dir>\tdump the whole db to a text file 
in <out_dir>");
+      System.err.println("\t-stats [-sort] \tprint overall statistics to 
System.out");
+      System.err.println("\t\t[-sort]\tlist status sorted by host");
+      System.err.println("\t-dump <out_dir> [-format normal|csv ]\tdump the 
whole db to a text file in <out_dir>");
+      System.err.println("\t\t[-format csv]\tdump in Csv format");
+      System.err.println("\t\t[-format normal]\tdump in standard format 
(default option)");
       System.err.println("\t-url <url>\tprint information on <url> to 
System.out");
       System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> 
urls sorted by score to <out_dir>");
       System.err.println("\t\t[<min>]\tskip records with scores below this 
value.");
@@ -431,10 +492,18 @@
     Configuration conf = NutchConfiguration.create();
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-stats")) {
-        dbr.processStatJob(crawlDb, conf);
+        boolean toSort = false;
+        if(i < args.length - 1 && "-sort".equals(args[i+1])){
+          toSort = true;
+          i++;
+        }
+        dbr.processStatJob(crawlDb, conf, toSort);
       } else if (args[i].equals("-dump")) {
         param = args[++i];
-        dbr.processDumpJob(crawlDb, param, conf);
+        String format = "normal";
+        if(i < args.length - 1 &&  "-format".equals(args[i+1]))
+          format = args[i=i+2];
+        dbr.processDumpJob(crawlDb, param, conf, "csv".equals(format)? 
CSV_FORMAT : STD_FORMAT );
       } else if (args[i].equals("-url")) {
         param = args[++i];
         dbr.readUrl(crawlDb, param, conf);


Reply via email to