Author: ab
Date: Mon Aug 14 10:54:17 2006
New Revision: 431389

URL: http://svn.apache.org/viewvc?rev=431389&view=rev
Log:
Use a CombiningCollector when calculating readdb -stats. This drastically
reduces the size of intermediate data, resulting in significant speedups
for large databases.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=431389&r1=431388&r2=431389&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Mon 
Aug 14 10:54:17 2006
@@ -82,15 +82,49 @@
   }
 
   public static class CrawlDbStatMapper implements Mapper {
+    LongWritable COUNT_1 = new LongWritable(1);
     public void configure(JobConf job) {}
     public void close() {}
     public void map(WritableComparable key, Writable value, OutputCollector 
output, Reporter reporter)
             throws IOException {
       CrawlDatum cd = (CrawlDatum) value;
-      output.collect(new UTF8("TOTAL urls"), new LongWritable(1));
-      output.collect(new UTF8("status"), new LongWritable(cd.getStatus()));
-      output.collect(new UTF8("retry"), new 
LongWritable(cd.getRetriesSinceFetch()));
-      output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() 
* 1000.0)));
+      output.collect(new UTF8("T"), COUNT_1);
+      output.collect(new UTF8("status " + cd.getStatus()), COUNT_1);
+      output.collect(new UTF8("retry " + cd.getRetriesSinceFetch()), COUNT_1);
+      output.collect(new UTF8("s"), new LongWritable((long) (cd.getScore() * 
1000.0)));
+    }
+  }
+  
+  public static class CrawlDbStatCombiner implements Reducer {
+    LongWritable val = new LongWritable();
+    
+    public CrawlDbStatCombiner() { }
+    public void configure(JobConf job) { }
+    public void close() {}
+    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter)
+        throws IOException {
+      val.set(0L);
+      String k = ((UTF8)key).toString();
+      if (!k.equals("s")) {
+        while (values.hasNext()) {
+          LongWritable cnt = (LongWritable)values.next();
+          val.set(val.get() + cnt.get());
+        }
+        output.collect(key, val);
+      } else {
+        long total = 0;
+        long min = Long.MAX_VALUE;
+        long max = Long.MIN_VALUE;
+        while (values.hasNext()) {
+          LongWritable cnt = (LongWritable)values.next();
+          if (cnt.get() < min) min = cnt.get();
+          if (cnt.get() > max) max = cnt.get();
+          total += cnt.get();
+        }
+        output.collect(new UTF8("scn"), new LongWritable(min));
+        output.collect(new UTF8("scx"), new LongWritable(max));
+        output.collect(new UTF8("sct"), new LongWritable(total));
+      }
     }
   }
 
@@ -101,7 +135,7 @@
             throws IOException {
 
       String k = ((UTF8) key).toString();
-      if (k.equals("TOTAL urls")) {
+      if (k.equals("T")) {
         // sum all values for this key
         long sum = 0;
         while (values.hasNext()) {
@@ -109,41 +143,34 @@
         }
         // output sum
         output.collect(key, new LongWritable(sum));
-      } else if (k.equals("status") || k.equals("retry")) {
-        TreeMap stats = new TreeMap();
+      } else if (k.startsWith("status") || k.startsWith("retry")) {
+        LongWritable cnt = new LongWritable();
+        while (values.hasNext()) {
+          LongWritable val = (LongWritable)values.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("scx")) {
+        LongWritable cnt = new LongWritable(Long.MIN_VALUE);
+        while (values.hasNext()) {
+          LongWritable val = (LongWritable)values.next();
+          if (cnt.get() < val.get()) cnt.set(val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("scn")) {
+        LongWritable cnt = new LongWritable(Long.MAX_VALUE);
         while (values.hasNext()) {
-          long val = ((LongWritable) values.next()).get();
-          LongWritable cnt = (LongWritable) stats.get(k + " " + val);
-          if (cnt == null) {
-            cnt = new LongWritable();
-            stats.put(k + " " + val, cnt);
-          }
-          cnt.set(cnt.get() + 1);
-        }
-        Iterator it = stats.keySet().iterator();
-        while (it.hasNext()) {
-          String s = (String) it.next();
-          output.collect(new UTF8(s), (LongWritable) stats.get(s));
-        }
-      } else if (k.equals("score")) {
-        long min = 0, max = 0, total = 0;
-        int cnt = 0;
-        boolean first = true;
-        while (values.hasNext()) {
-          long val = ((LongWritable) values.next()).get();
-          if (first) {
-            min = val;
-            max = val;
-            first = false;
-          }
-          if (val < min) min = val;
-          if (val > max) max = val;
-          total += val;
-          cnt++;
-        }
-        output.collect(new UTF8("max score"), new LongWritable(max));
-        output.collect(new UTF8("min score"), new LongWritable(min));
-        output.collect(new UTF8("avg score"), new LongWritable(total / cnt));
+          LongWritable val = (LongWritable)values.next();
+          if (cnt.get() > val.get()) cnt.set(val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("sct")) {
+        LongWritable cnt = new LongWritable();
+        while (values.hasNext()) {
+          LongWritable val = (LongWritable)values.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        output.collect(key, cnt);
       }
     }
   }
@@ -221,6 +248,7 @@
     job.setInputValueClass(CrawlDatum.class);
 
     job.setMapperClass(CrawlDbStatMapper.class);
+    job.setCombinerClass(CrawlDbStatCombiner.class);
     job.setReducerClass(CrawlDbStatReducer.class);
 
     job.setOutputPath(tmpFolder);
@@ -238,7 +266,6 @@
     LongWritable value = new LongWritable();
 
     TreeMap stats = new TreeMap();
-    int avg = 0, min = 0, max = 0;
     for (int i = 0; i < readers.length; i++) {
       SequenceFile.Reader reader = readers[i];
       while (reader.next(key, value)) {
@@ -246,31 +273,35 @@
         LongWritable val = (LongWritable) stats.get(k);
         if (val == null) {
           val = new LongWritable();
+          if (k.equals("scx")) val.set(Long.MIN_VALUE);
+          if (k.equals("scn")) val.set(Long.MAX_VALUE);
           stats.put(k, val);
         }
-        val.set(val.get() + value.get());
-        if (k.startsWith("max"))
-          max++;
-        else if (k.startsWith("min"))
-          min++;
-        else if (k.startsWith("avg")) avg++;
+        if (k.equals("scx")) {
+          if (val.get() < value.get()) val.set(value.get());
+        } else if (k.equals("scn")) {
+          if (val.get() > value.get()) val.set(value.get());          
+        } else {
+          val.set(val.get() + value.get());
+        }
       }
     }
     
     if (LOG.isInfoEnabled()) {
       LOG.info("Statistics for CrawlDb: " + crawlDb);
+      LongWritable totalCnt = (LongWritable)stats.get("T");
+      stats.remove("T");
+      LOG.info("TOTAL urls:\t" + totalCnt.get());
       Iterator it = stats.keySet().iterator();
       while (it.hasNext()) {
         String k = (String) it.next();
         LongWritable val = (LongWritable) stats.get(k);
-        if (k.indexOf("score") != -1) {
-          if (k.startsWith("min")) {
-            LOG.info(k + ":\t" + (float) ((float) (val.get() / min) / 
1000.0f));
-          } else if (k.startsWith("max")) {
-            LOG.info(k + ":\t" + (float) ((float) (val.get() / max) / 
1000.0f));
-          } else if (k.startsWith("avg")) {
-            LOG.info(k + ":\t" + (float) ((float) (val.get() / avg) / 
1000.0f));
-          }
+        if (k.equals("scn")) {
+          LOG.info("min score:\t" + (float) (val.get() / 1000.0f));
+        } else if (k.equals("scx")) {
+          LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
+        } else if (k.equals("sct")) {
+          LOG.info("avg score:\t" + (float) ((float) (val.get() / 
(float)totalCnt.get()) / 1000.0f));
         } else if (k.startsWith("status")) {
           int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1));
           LOG.info(k + " (" + CrawlDatum.statNames[code] + "):\t" + val);



-------------------------------------------------------------------------
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to