Author: ab Date: Mon Aug 14 10:54:17 2006 New Revision: 431389 URL: http://svn.apache.org/viewvc?rev=431389&view=rev Log: Use a CombiningCollector when calculating readdb -stats. This drastically reduces the size of intermediate data, resulting in significant speedups for large databases.
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=431389&r1=431388&r2=431389&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Mon Aug 14 10:54:17 2006 @@ -82,15 +82,49 @@ } public static class CrawlDbStatMapper implements Mapper { + LongWritable COUNT_1 = new LongWritable(1); public void configure(JobConf job) {} public void close() {} public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum cd = (CrawlDatum) value; - output.collect(new UTF8("TOTAL urls"), new LongWritable(1)); - output.collect(new UTF8("status"), new LongWritable(cd.getStatus())); - output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch())); - output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0))); + output.collect(new UTF8("T"), COUNT_1); + output.collect(new UTF8("status " + cd.getStatus()), COUNT_1); + output.collect(new UTF8("retry " + cd.getRetriesSinceFetch()), COUNT_1); + output.collect(new UTF8("s"), new LongWritable((long) (cd.getScore() * 1000.0))); + } + } + + public static class CrawlDbStatCombiner implements Reducer { + LongWritable val = new LongWritable(); + + public CrawlDbStatCombiner() { } + public void configure(JobConf job) { } + public void close() {} + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + throws IOException { + val.set(0L); + String k = ((UTF8)key).toString(); + if (!k.equals("s")) { + while (values.hasNext()) { + LongWritable cnt = (LongWritable)values.next(); + val.set(val.get() + cnt.get()); + } + output.collect(key, val); + } else { + long total = 0; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + while (values.hasNext()) { + LongWritable cnt = (LongWritable)values.next(); + if (cnt.get() < min) min = cnt.get(); + if (cnt.get() > max) max = cnt.get(); + total += cnt.get(); + } + output.collect(new UTF8("scn"), new LongWritable(min)); + output.collect(new UTF8("scx"), new LongWritable(max)); + output.collect(new UTF8("sct"), new LongWritable(total)); + } } } @@ -101,7 +135,7 @@ throws IOException { String k = ((UTF8) key).toString(); - if (k.equals("TOTAL urls")) { + if (k.equals("T")) { // sum all values for this key long sum = 0; while (values.hasNext()) { @@ -109,41 +143,34 @@ } // output sum output.collect(key, new LongWritable(sum)); - } else if (k.equals("status") || k.equals("retry")) { - TreeMap stats = new TreeMap(); + } else if (k.startsWith("status") || k.startsWith("retry")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scx")) { + LongWritable cnt = new LongWritable(Long.MIN_VALUE); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + if (cnt.get() < val.get()) cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scn")) { + LongWritable cnt = new LongWritable(Long.MAX_VALUE); while (values.hasNext()) { - long val = ((LongWritable) values.next()).get(); - LongWritable cnt = (LongWritable) stats.get(k + " " + val); - if (cnt == null) { - cnt = new LongWritable(); - stats.put(k + " " + val, cnt); - } - cnt.set(cnt.get() + 1); - } - Iterator it = stats.keySet().iterator(); - while (it.hasNext()) { - String s = (String) it.next(); - output.collect(new UTF8(s), (LongWritable) stats.get(s)); - } - } else if (k.equals("score")) { - long min = 0, max = 0, total = 0; - int cnt = 0; - boolean first = true; - while (values.hasNext()) { - long val = ((LongWritable) values.next()).get(); - if (first) { - min = val; - max = val; - first = false; - } - if (val < min) min = val; - if (val > max) max = val; - total += val; - cnt++; - } - output.collect(new UTF8("max score"), new LongWritable(max)); - output.collect(new UTF8("min score"), new LongWritable(min)); - output.collect(new UTF8("avg score"), new LongWritable(total / cnt)); + LongWritable val = (LongWritable)values.next(); + if (cnt.get() > val.get()) cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("sct")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = (LongWritable)values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); } } } @@ -221,6 +248,7 @@ job.setInputValueClass(CrawlDatum.class); job.setMapperClass(CrawlDbStatMapper.class); + job.setCombinerClass(CrawlDbStatCombiner.class); job.setReducerClass(CrawlDbStatReducer.class); job.setOutputPath(tmpFolder); @@ -238,7 +266,6 @@ LongWritable value = new LongWritable(); TreeMap stats = new TreeMap(); - int avg = 0, min = 0, max = 0; for (int i = 0; i < readers.length; i++) { SequenceFile.Reader reader = readers[i]; while (reader.next(key, value)) { @@ -246,31 +273,35 @@ LongWritable val = (LongWritable) stats.get(k); if (val == null) { val = new LongWritable(); + if (k.equals("scx")) val.set(Long.MIN_VALUE); + if (k.equals("scn")) val.set(Long.MAX_VALUE); stats.put(k, val); } - val.set(val.get() + value.get()); - if (k.startsWith("max")) - max++; - else if (k.startsWith("min")) - min++; - else if (k.startsWith("avg")) avg++; + if (k.equals("scx")) { + if (val.get() < value.get()) val.set(value.get()); + } else if (k.equals("scn")) { + if (val.get() > value.get()) val.set(value.get()); + } else { + val.set(val.get() + value.get()); + } } } if (LOG.isInfoEnabled()) { LOG.info("Statistics for CrawlDb: " + crawlDb); + LongWritable totalCnt = (LongWritable)stats.get("T"); + stats.remove("T"); + LOG.info("TOTAL urls:\t" + totalCnt.get()); Iterator it = stats.keySet().iterator(); while (it.hasNext()) { String k = (String) it.next(); LongWritable val = (LongWritable) stats.get(k); - if (k.indexOf("score") != -1) { - if (k.startsWith("min")) { - LOG.info(k + ":\t" + (float) ((float) (val.get() / min) / 1000.0f)); - } else if (k.startsWith("max")) { - LOG.info(k + ":\t" + (float) ((float) (val.get() / max) / 1000.0f)); - } else if (k.startsWith("avg")) { - LOG.info(k + ":\t" + (float) ((float) (val.get() / avg) / 1000.0f)); - } + if (k.equals("scn")) { + LOG.info("min score:\t" + (float) (val.get() / 1000.0f)); + } else if (k.equals("scx")) { + LOG.info("max score:\t" + (float) (val.get() / 1000.0f)); + } else if (k.equals("sct")) { + LOG.info("avg score:\t" + (float) ((float) (val.get() / (float)totalCnt.get()) / 1000.0f)); } else if (k.startsWith("status")) { int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1)); LOG.info(k + " (" + CrawlDatum.statNames[code] + "):\t" + val); ------------------------------------------------------------------------- Using Tomcat but need to do more? Need to support web services, security? Get stuff done quickly with pre-integrated technology to make your job easier Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642 _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs