Author: ab Date: Tue Jan 15 14:02:52 2008 New Revision: 612245 URL: http://svn.apache.org/viewvc?rev=612245&view=rev Log: CrawlDbReader: add some new stats + dump into a csv format
Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=612245&r1=612244&r2=612245&view=diff ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Tue Jan 15 14:02:52 2008 @@ -184,6 +184,9 @@ 63. NUTCH-534 - SegmentMerger: add -normalize option (Emmanuel Joke via ab) +64. NUTCH-528 - CrawlDbReader: add some new stats + dump into a CSV format + (Emmanuel Joke via ab) + Release 0.9 - 2007-04-02 Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=612245&r1=612244&r2=612245&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Tue Jan 15 14:02:52 2008 @@ -17,7 +17,10 @@ package org.apache.nutch.crawl; +import java.io.DataOutputStream; import java.io.IOException; +import java.net.URL; +import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -36,13 +39,13 @@ import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapFileOutputFormat; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputFormatBase; +import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; @@ -51,8 +54,10 @@ import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Progressable; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.StringUtil; /** * Read utility for the CrawlDB. @@ -64,6 +69,9 @@ public static final Log LOG = LogFactory.getLog(CrawlDbReader.class); + public static final int STD_FORMAT = 0; + public static final int CSV_FORMAT = 1; + private MapFile.Reader[] readers = null; private void openReaders(String crawlDb, Configuration config) throws IOException { @@ -82,28 +90,88 @@ } } } + + public static class CrawlDatumCsvOutputFormat extends OutputFormatBase<Text,CrawlDatum> { + protected static class LineRecordWriter implements RecordWriter<Text,CrawlDatum> { + private DataOutputStream out; + + public LineRecordWriter(DataOutputStream out) { + this.out = out; + try { + out.writeBytes("Url;Status code;Status name;Fetch Time;Modified Time;Retries since fetch;Retry interval;Score;Signature;Metadata\n"); + } catch (IOException e) {} + } + + public synchronized void write(Text key, CrawlDatum value) throws IOException { + out.writeByte('"'); + out.writeBytes(key.toString()); + out.writeByte('"'); + out.writeByte(';'); + out.writeBytes(Integer.toString(value.getStatus())); + out.writeByte(';'); + out.writeByte('"'); + out.writeBytes(CrawlDatum.getStatusName(value.getStatus())); + out.writeByte('"'); + out.writeByte(';'); + out.writeBytes(new Date(value.getFetchTime()).toString()); + out.writeByte(';'); + out.writeBytes(new Date(value.getModifiedTime()).toString()); + out.writeByte(';'); + out.writeBytes(Integer.toString(value.getRetriesSinceFetch())); + out.writeByte(';'); + out.writeBytes(Float.toString(value.getFetchInterval())); + out.writeByte(';'); + out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); + out.writeByte(';'); + out.writeBytes(Float.toString(value.getScore())); + out.writeByte(';'); + out.writeByte('"'); + out.writeBytes(value.getSignature() != null ? StringUtil.toHexString(value.getSignature()): "null"); + out.writeByte('"'); + out.writeByte('\n'); + } + + public synchronized void close(Reporter reporter) throws IOException { + out.close(); + } + } + + public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, JobConf job, String name, + Progressable progress) throws IOException { + Path dir = job.getOutputPath(); + DataOutputStream fileOut = fs.create(new Path(dir, name), progress); + return new LineRecordWriter(fileOut); + } + } - public static class CrawlDbStatMapper implements Mapper { + public static class CrawlDbStatMapper implements Mapper<Text, CrawlDatum, Text, LongWritable> { LongWritable COUNT_1 = new LongWritable(1); - public void configure(JobConf job) {} + private boolean sort = false; + public void configure(JobConf job) { + sort = job.getBoolean("db.reader.stats.sort", false ); + } public void close() {} - public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) + public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { - CrawlDatum cd = (CrawlDatum) value; output.collect(new Text("T"), COUNT_1); - output.collect(new Text("status " + cd.getStatus()), COUNT_1); - output.collect(new Text("retry " + cd.getRetriesSinceFetch()), COUNT_1); - output.collect(new Text("s"), new LongWritable((long) (cd.getScore() * 1000.0))); + output.collect(new Text("status " + value.getStatus()), COUNT_1); + output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); + output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0))); + if(sort){ + URL u = new URL(key.toString()); + String host = u.getHost(); + output.collect(new Text("status " + value.getStatus() + " " + host), COUNT_1); + } } } - public static class CrawlDbStatCombiner implements Reducer { + public static class CrawlDbStatCombiner implements Reducer<Text, LongWritable, Text, LongWritable> { LongWritable val = new LongWritable(); public CrawlDbStatCombiner() { } public void configure(JobConf job) { } public void close() {} - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { val.set(0L); String k = ((Text)key).toString(); @@ -130,10 +198,10 @@ } } - public static class CrawlDbStatReducer implements Reducer { + public static class CrawlDbStatReducer implements Reducer<Text, LongWritable, Text, LongWritable> { public void configure(JobConf job) {} public void close() {} - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { String k = ((Text) key).toString(); @@ -177,53 +245,39 @@ } } - public static class CrawlDbDumpReducer implements Reducer { - - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { - while (values.hasNext()) { - output.collect(key, (Writable)values.next()); - } - } - - public void configure(JobConf job) {} - public void close() {} - } - - public static class CrawlDbTopNMapper implements Mapper { + public static class CrawlDbTopNMapper implements Mapper<Text, CrawlDatum, FloatWritable, Text> { private static final FloatWritable fw = new FloatWritable(); private float min = 0.0f; public void configure(JobConf job) { - long lmin = job.getLong("CrawlDbReader.topN.min", 0); + long lmin = job.getLong("db.reader.topn.min", 0); if (lmin != 0) { min = (float)lmin / 1000000.0f; } } public void close() {} - public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) + public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException { - CrawlDatum datum = (CrawlDatum)value; - if (datum.getScore() < min) return; // don't collect low-scoring records - fw.set(-datum.getScore()); // reverse sorting order + if (value.getScore() < min) return; // don't collect low-scoring records + fw.set(-value.getScore()); // reverse sorting order output.collect(fw, key); // invert mapping: score -> url } } - public static class CrawlDbTopNReducer implements Reducer { + public static class CrawlDbTopNReducer implements Reducer<FloatWritable, Text, FloatWritable, Text> { private long topN; private long count = 0L; - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + public void reduce(FloatWritable key, Iterator<Text> values, OutputCollector<FloatWritable, Text> output, Reporter reporter) throws IOException { while (values.hasNext() && count < topN) { - FloatWritable fw = (FloatWritable)key; - fw.set(-fw.get()); - output.collect(fw, (Writable)values.next()); + key.set(-key.get()); + output.collect(key, values.next()); count++; } } public void configure(JobConf job) { - topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks(); + topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks(); } public void close() {} @@ -233,7 +287,7 @@ closeReaders(); } - public void processStatJob(String crawlDb, Configuration config) throws IOException { + public void processStatJob(String crawlDb, Configuration config, boolean sort) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics start: " + crawlDb); @@ -243,6 +297,7 @@ JobConf job = new NutchJob(config); job.setJobName("stats " + crawlDb); + job.setBoolean("db.reader.stats.sort", sort); job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); @@ -303,8 +358,10 @@ } else if (k.equals("sct")) { LOG.info("avg score:\t" + (float) ((((double)val.get()) / totalCnt.get()) / 1000.0)); } else if (k.startsWith("status")) { - int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1)); - LOG.info(k + " (" + CrawlDatum.getStatusName((byte)code) + "):\t" + val); + String[] st = k.split(" "); + int code = Integer.parseInt(st[1]); + if(st.length >2 ) LOG.info(" " + st[2] +" :\t" + val); + else LOG.info(st[0] +" " +code + " (" + CrawlDatum.getStatusName((byte) code) + "):\t" + val); } else LOG.info(k + ":\t" + val); } } @@ -332,7 +389,7 @@ } } - public void processDumpJob(String crawlDb, String output, Configuration config) throws IOException { + public void processDumpJob(String crawlDb, String output, Configuration config, int format) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); @@ -348,7 +405,8 @@ job.setInputFormat(SequenceFileInputFormat.class); job.setOutputPath(outFolder); - job.setOutputFormat(TextOutputFormat.class); + if(format == CSV_FORMAT) job.setOutputFormat(CrawlDatumCsvOutputFormat.class); + else job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); @@ -382,7 +440,7 @@ job.setOutputValueClass(Text.class); // XXX hmmm, no setFloat() in the API ... :( - job.setLong("CrawlDbReader.topN.min", Math.round(1000000.0 * min)); + job.setLong("db.reader.topn.min", Math.round(1000000.0 * min)); JobClient.runJob(job); if (LOG.isInfoEnabled()) { @@ -390,7 +448,7 @@ } job = new NutchJob(config); job.setJobName("topN collect " + crawlDb); - job.setLong("CrawlDbReader.topN", topN); + job.setLong("db.reader.topn", topN); job.addInputPath(tempDir); job.setInputFormat(SequenceFileInputFormat.class); @@ -418,8 +476,11 @@ if (args.length < 1) { System.err.println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); System.err.println("\t<crawldb>\tdirectory name where crawldb is located"); - System.err.println("\t-stats\tprint overall statistics to System.out"); - System.err.println("\t-dump <out_dir>\tdump the whole db to a text file in <out_dir>"); + System.err.println("\t-stats [-sort] \tprint overall statistics to System.out"); + System.err.println("\t\t[-sort]\tlist status sorted by host"); + System.err.println("\t-dump <out_dir> [-format normal|csv ]\tdump the whole db to a text file in <out_dir>"); + System.err.println("\t\t[-format csv]\tdump in Csv format"); + System.err.println("\t\t[-format normal]\tdump in standard format (default option)"); System.err.println("\t-url <url>\tprint information on <url> to System.out"); System.err.println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); System.err.println("\t\t[<min>]\tskip records with scores below this value."); @@ -431,10 +492,18 @@ Configuration conf = NutchConfiguration.create(); for (int i = 1; i < args.length; i++) { if (args[i].equals("-stats")) { - dbr.processStatJob(crawlDb, conf); + boolean toSort = false; + if(i < args.length - 1 && "-sort".equals(args[i+1])){ + toSort = true; + i++; + } + dbr.processStatJob(crawlDb, conf, toSort); } else if (args[i].equals("-dump")) { param = args[++i]; - dbr.processDumpJob(crawlDb, param, conf); + String format = "normal"; + if(i < args.length - 1 && "-format".equals(args[i+1])) + format = args[i=i+2]; + dbr.processDumpJob(crawlDb, param, conf, "csv".equals(format)? CSV_FORMAT : STD_FORMAT ); } else if (args[i].equals("-url")) { param = args[++i]; dbr.readUrl(crawlDb, param, conf);