This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 8a663f9 Fix for NUTCH-1863: Add JSON format dump output to readdb command (#490) 8a663f9 is described below commit 8a663f92e231bb72c7a106494dd1e30a68927d87 Author: Shashanka Balakuntala Srinivasa <shbalakunt...@gmail.com> AuthorDate: Fri Dec 27 22:12:08 2019 +0530 Fix for NUTCH-1863: Add JSON format dump output to readdb command (#490) --- src/java/org/apache/nutch/crawl/CrawlDbReader.java | 421 +++++++++++++-------- 1 file changed, 264 insertions(+), 157 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java index f59f895..b9200e7 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -75,6 +76,12 @@ import org.apache.nutch.util.StringUtil; import org.apache.nutch.util.TimingUtil; import org.apache.commons.jexl2.Expression; +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + /** * Read utility for the CrawlDB. * @@ -96,7 +103,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { throws IOException { Path crawlDbPath = new Path(crawlDb, CrawlDb.CURRENT_NAME); - FileStatus stat = crawlDbPath.getFileSystem(config).getFileStatus(crawlDbPath); + FileStatus stat = crawlDbPath.getFileSystem(config) + .getFileStatus(crawlDbPath); long lastModified = stat.getModificationTime(); synchronized (this) { @@ -128,16 +136,33 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { readers = null; } - public static class CrawlDatumCsvOutputFormat extends - FileOutputFormat<Text, CrawlDatum> { - protected static class LineRecordWriter extends - RecordWriter<Text, CrawlDatum> { + @SuppressWarnings("serial") + public static class JsonIndenter extends MinimalPrettyPrinter { + + @Override + public void writeObjectFieldValueSeparator(JsonGenerator jg) + throws IOException, JsonGenerationException { + jg.writeRaw(": "); + } + + @Override + public void writeObjectEntrySeparator(JsonGenerator jg) + throws IOException, JsonGenerationException { + jg.writeRaw(", "); + } + } + + public static class CrawlDatumCsvOutputFormat + extends FileOutputFormat<Text, CrawlDatum> { + protected static class LineRecordWriter + extends RecordWriter<Text, CrawlDatum> { private DataOutputStream out; public LineRecordWriter(DataOutputStream out) { this.out = out; try { - out.writeBytes("Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n"); + out.writeBytes( + "Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n"); } catch (IOException e) { } } @@ -162,13 +187,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { out.writeByte(','); out.writeBytes(Float.toString(value.getFetchInterval())); out.writeByte(','); - out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); + out.writeBytes(Float.toString( + (value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); out.writeByte(','); out.writeBytes(Float.toString(value.getScore())); out.writeByte(','); out.writeByte('"'); - out.writeBytes(value.getSignature() != null ? StringUtil - .toHexString(value.getSignature()) : "null"); + out.writeBytes(value.getSignature() != null + ? StringUtil.toHexString(value.getSignature()) + : "null"); out.writeByte('"'); out.writeByte(','); out.writeByte('"'); @@ -185,13 +212,77 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { out.writeByte('\n'); } - public synchronized void close(TaskAttemptContext context) throws IOException { + public synchronized void close(TaskAttemptContext context) + throws IOException { + out.close(); + } + } + + public RecordWriter<Text, CrawlDatum> getRecordWriter( + TaskAttemptContext context) throws IOException { + String name = getUniqueFile(context, "part", ""); + Path dir = FileOutputFormat.getOutputPath(context); + FileSystem fs = dir.getFileSystem(context.getConfiguration()); + DataOutputStream fileOut = fs.create(new Path(dir, name), context); + return new LineRecordWriter(fileOut); + } + } + + public static class CrawlDatumJsonOutputFormat + extends FileOutputFormat<Text, CrawlDatum> { + protected static class LineRecordWriter + extends RecordWriter<Text, CrawlDatum> { + private DataOutputStream out; + private ObjectMapper jsonMapper = new ObjectMapper(); + private ObjectWriter jsonWriter; + + public LineRecordWriter(DataOutputStream out) { + this.out = out; + jsonMapper.getFactory() + .configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true); + jsonWriter = jsonMapper.writer(new JsonIndenter()); + } + + public synchronized void write(Text key, CrawlDatum value) + throws IOException { + Map<String, Object> data = new LinkedHashMap<String, Object>(); + data.put("url", key.toString()); + data.put("statusCode", value.getStatus()); + data.put("statusName", CrawlDatum.getStatusName(value.getStatus())); + data.put("fetchTime", new Date(value.getFetchTime()).toString()); + data.put("modifiedTime", new Date(value.getModifiedTime()).toString()); + data.put("retriesSinceFetch", value.getRetriesSinceFetch()); + data.put("retryIntervalSeconds", value.getFetchInterval()); + data.put("retryIntervalDays", (value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY)); + data.put("score", value.getScore()); + data.put("signature", + (value.getSignature() != null + ? StringUtil.toHexString(value.getSignature()) + : "null")); + Map<String, Object> metaData = null; + if (value.getMetaData() != null) { + metaData = new LinkedHashMap<String, Object>(); + for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) { + metaData.put(e.getKey().toString(), e.getValue()); + } + } + if (metaData != null) { + data.put("metadata", metaData); + } else { + data.put("metadata", ""); + } + out.write(jsonWriter.writeValueAsBytes(data)); + out.writeByte('\n'); + } + + public synchronized void close(TaskAttemptContext context) + throws IOException { out.close(); } } - public RecordWriter<Text, CrawlDatum> getRecordWriter(TaskAttemptContext - context) throws IOException { + public RecordWriter<Text, CrawlDatum> getRecordWriter( + TaskAttemptContext context) throws IOException { String name = getUniqueFile(context, "part", ""); Path dir = FileOutputFormat.getOutputPath(context); FileSystem fs = dir.getFileSystem(context.getConfiguration()); @@ -200,13 +291,14 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } } - public static class CrawlDbStatMapper extends - Mapper<Text, CrawlDatum, Text, NutchWritable> { + public static class CrawlDbStatMapper + extends Mapper<Text, CrawlDatum, Text, NutchWritable> { NutchWritable COUNT_1 = new NutchWritable(new LongWritable(1)); private boolean sort = false; @Override - public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) { + public void setup( + Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) { Configuration conf = context.getConfiguration(); sort = conf.getBoolean("db.reader.stats.sort", false); } @@ -216,8 +308,7 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { throws IOException, InterruptedException { context.write(new Text("T"), COUNT_1); context.write(new Text("status " + value.getStatus()), COUNT_1); - context.write(new Text("retry " + value.getRetriesSinceFetch()), - COUNT_1); + context.write(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); if (Float.isNaN(value.getScore())) { context.write(new Text("scNaN"), COUNT_1); @@ -236,7 +327,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { context.write(new Text("ftt"), fetchTime); // fetch interval (in seconds) - NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval())); + NutchWritable fetchInterval = new NutchWritable( + new LongWritable(value.getFetchInterval())); context.write(new Text("fi"), fetchInterval); context.write(new Text("fit"), fetchInterval); @@ -249,15 +341,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } } - public static class CrawlDbStatReducer extends - Reducer<Text, NutchWritable, Text, NutchWritable> { - public void setup(Reducer<Text, NutchWritable, Text, NutchWritable>.Context context) { + public static class CrawlDbStatReducer + extends Reducer<Text, NutchWritable, Text, NutchWritable> { + public void setup( + Reducer<Text, NutchWritable, Text, NutchWritable>.Context context) { } @Override public void reduce(Text key, Iterable<NutchWritable> values, - Context context) - throws IOException, InterruptedException { + Context context) throws IOException, InterruptedException { String k = key.toString(); if (k.equals("T") || k.startsWith("status") || k.startsWith("retry") || k.equals("ftt") || k.equals("fit")) { @@ -334,20 +426,20 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } } - public static class CrawlDbTopNMapper extends - Mapper<Text, CrawlDatum, FloatWritable, Text> { + public static class CrawlDbTopNMapper + extends Mapper<Text, CrawlDatum, FloatWritable, Text> { private static final FloatWritable fw = new FloatWritable(); private float min = 0.0f; @Override - public void setup(Mapper<Text, CrawlDatum, FloatWritable, Text>.Context context) { + public void setup( + Mapper<Text, CrawlDatum, FloatWritable, Text>.Context context) { Configuration conf = context.getConfiguration(); min = conf.getFloat("db.reader.topn.min", 0.0f); } @Override - public void map(Text key, CrawlDatum value, - Context context) + public void map(Text key, CrawlDatum value, Context context) throws IOException, InterruptedException { if (value.getScore() < min) return; // don't collect low-scoring records @@ -356,15 +448,14 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } } - public static class CrawlDbTopNReducer extends - Reducer<FloatWritable, Text, FloatWritable, Text> { + public static class CrawlDbTopNReducer + extends Reducer<FloatWritable, Text, FloatWritable, Text> { private long topN; private long count = 0L; @Override public void reduce(FloatWritable key, Iterable<Text> values, - Context context) - throws IOException, InterruptedException { + Context context) throws IOException, InterruptedException { for (Text value : values) { if (count < topN) { key.set(-key.get()); @@ -375,9 +466,11 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } @Override - public void setup(Reducer<FloatWritable, Text, FloatWritable, Text>.Context context) { + public void setup( + Reducer<FloatWritable, Text, FloatWritable, Text>.Context context) { Configuration conf = context.getConfiguration(); - topN = conf.getLong("db.reader.topn", 100) / Integer.parseInt(conf.get("mapreduce.job.reduces")); + topN = conf.getLong("db.reader.topn", 100) + / Integer.parseInt(conf.get("mapreduce.job.reduces")); } } @@ -385,30 +478,32 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { closeReaders(); } - private TreeMap<String, Writable> processStatJobHelper(String crawlDb, Configuration config, boolean sort) - throws IOException, InterruptedException, ClassNotFoundException{ - Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); + private TreeMap<String, Writable> processStatJobHelper(String crawlDb, + Configuration config, boolean sort) + throws IOException, InterruptedException, ClassNotFoundException { + Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); - Job job = NutchJob.getInstance(config); - config = job.getConfiguration(); - job.setJobName("stats " + crawlDb); - config.setBoolean("db.reader.stats.sort", sort); + Job job = NutchJob.getInstance(config); + config = job.getConfiguration(); + job.setJobName("stats " + crawlDb); + config.setBoolean("db.reader.stats.sort", sort); - FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); - job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); + job.setInputFormatClass(SequenceFileInputFormat.class); - job.setJarByClass(CrawlDbReader.class); - job.setMapperClass(CrawlDbStatMapper.class); - job.setCombinerClass(CrawlDbStatReducer.class); - job.setReducerClass(CrawlDbStatReducer.class); + job.setJarByClass(CrawlDbReader.class); + job.setMapperClass(CrawlDbStatMapper.class); + job.setCombinerClass(CrawlDbStatReducer.class); + job.setReducerClass(CrawlDbStatReducer.class); - FileOutputFormat.setOutputPath(job, tmpFolder); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NutchWritable.class); + FileOutputFormat.setOutputPath(job, tmpFolder); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); - // https://issues.apache.org/jira/browse/NUTCH-1029 - config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + // https://issues.apache.org/jira/browse/NUTCH-1029 + config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); FileSystem fileSystem = tmpFolder.getFileSystem(config); try { boolean success = job.waitForCompletion(true); @@ -427,38 +522,39 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } // reading the result - SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder, config); - - Text key = new Text(); - NutchWritable value = new NutchWritable(); - - TreeMap<String, Writable> stats = new TreeMap<>(); - for (int i = 0; i < readers.length; i++) { - SequenceFile.Reader reader = readers[i]; - while (reader.next(key, value)) { - String k = key.toString(); - Writable val = stats.get(k); - if (val == null) { - stats.put(k, value.get()); - continue; - } - if (k.equals("sc")) { - float min = Float.MAX_VALUE; + SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder, + config); + + Text key = new Text(); + NutchWritable value = new NutchWritable(); + + TreeMap<String, Writable> stats = new TreeMap<>(); + for (int i = 0; i < readers.length; i++) { + SequenceFile.Reader reader = readers[i]; + while (reader.next(key, value)) { + String k = key.toString(); + Writable val = stats.get(k); + if (val == null) { + stats.put(k, value.get()); + continue; + } + if (k.equals("sc")) { + float min = Float.MAX_VALUE; float max = Float.MIN_VALUE; - if (stats.containsKey("scn")) { - min = ((FloatWritable) stats.get("scn")).get(); - } else { - min = ((FloatWritable) stats.get("sc")).get(); - } + if (stats.containsKey("scn")) { + min = ((FloatWritable) stats.get("scn")).get(); + } else { + min = ((FloatWritable) stats.get("sc")).get(); + } if (stats.containsKey("scx")) { max = ((FloatWritable) stats.get("scx")).get(); } else { max = ((FloatWritable) stats.get("sc")).get(); } - float fvalue = ((FloatWritable) value.get()).get(); - if (min > fvalue) { - min = fvalue; - } + float fvalue = ((FloatWritable) value.get()).get(); + if (min > fvalue) { + min = fvalue; + } if (max < fvalue) { max = fvalue; } @@ -488,17 +584,16 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } stats.put(k + "n", new LongWritable(min)); stats.put(k + "x", new LongWritable(max)); - } else if (k.equals("sct")) { + } else if (k.equals("sct")) { FloatWritable fvalue = (FloatWritable) value.get(); - ((FloatWritable) val) - .set(((FloatWritable) val).get() + fvalue.get()); + ((FloatWritable) val).set(((FloatWritable) val).get() + fvalue.get()); } else if (k.equals("scd")) { MergingDigest tdigest = null; MergingDigest tdig = MergingDigest.fromBytes( ByteBuffer.wrap(((BytesWritable) value.get()).getBytes())); if (val instanceof BytesWritable) { - tdigest = MergingDigest.fromBytes( - ByteBuffer.wrap(((BytesWritable) val).getBytes())); + tdigest = MergingDigest + .fromBytes(ByteBuffer.wrap(((BytesWritable) val).getBytes())); tdigest.add(tdig); } else { tdigest = tdig; @@ -509,22 +604,21 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { stats.put(k, new BytesWritable(tdigestBytes.array())); } else { LongWritable lvalue = (LongWritable) value.get(); - ((LongWritable) val) - .set(((LongWritable) val).get() + lvalue.get()); - } - } - reader.close(); - } + ((LongWritable) val).set(((LongWritable) val).get() + lvalue.get()); + } + } + reader.close(); + } // remove score, fetch interval, and fetch time // (used for min/max calculation) stats.remove("sc"); stats.remove("fi"); stats.remove("ft"); - // removing the tmp folder - fileSystem.delete(tmpFolder, true); - return stats; + // removing the tmp folder + fileSystem.delete(tmpFolder, true); + return stats; } - + public void processStatJob(String crawlDb, Configuration config, boolean sort) throws IOException, InterruptedException, ClassNotFoundException { @@ -559,7 +653,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb statistics start: " + crawlDb); } - TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, config, sort); + TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, config, + sort); if (LOG.isInfoEnabled()) { LOG.info("Statistics for CrawlDb: " + crawlDb); @@ -649,8 +744,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { return 0; } - public void readUrl(String crawlDb, String url, Configuration config, StringBuilder output) - throws IOException { + public void readUrl(String crawlDb, String url, Configuration config, + StringBuilder output) throws IOException { CrawlDatum res = get(crawlDb, url, config); output.append("URL: " + url + "\n"); if (res != null) { @@ -663,7 +758,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status, - Integer retry, String expr, Float sample) throws IOException, ClassNotFoundException, InterruptedException { + Integer retry, String expr, Float sample) + throws IOException, ClassNotFoundException, InterruptedException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); LOG.info("CrawlDb db: " + crawlDb); @@ -683,6 +779,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setOutputFormatClass(CrawlDatumCsvOutputFormat.class); } else if (format.equals("crawldb")) { job.setOutputFormatClass(MapFileOutputFormat.class); + } else if (format.equals("json")) { + job.setOutputFormatClass(CrawlDatumJsonOutputFormat.class); } else { job.setOutputFormatClass(TextOutputFormat.class); } @@ -724,8 +822,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } } - public static class CrawlDbDumpMapper extends - Mapper<Text, CrawlDatum, Text, CrawlDatum> { + public static class CrawlDbDumpMapper + extends Mapper<Text, CrawlDatum, Text, CrawlDatum> { Pattern pattern = null; Matcher matcher = null; String status = null; @@ -734,14 +832,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { float sample; @Override - public void setup(Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) { + public void setup( + Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) { Configuration config = context.getConfiguration(); if (config.get("regex", null) != null) { pattern = Pattern.compile(config.get("regex")); } status = config.get("status", null); retry = config.getInt("retry", -1); - + if (config.get("expr", null) != null) { expr = JexlUtil.parseExpression(config.get("expr", null)); } @@ -749,8 +848,7 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } @Override - public void map(Text key, CrawlDatum value, - Context context) + public void map(Text key, CrawlDatum value, Context context) throws IOException, InterruptedException { // check sample @@ -765,9 +863,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } // check status - if (status != null - && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value - .getStatus()))) + if (status != null && !status + .equalsIgnoreCase(CrawlDatum.getStatusName(value.getStatus()))) return; // check regex @@ -777,7 +874,7 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { return; } } - + // check expr if (expr != null) { if (!value.evaluate(expr, key.toString())) { @@ -790,8 +887,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } public void processTopNJob(String crawlDb, long topN, float min, - String output, Configuration config) throws IOException, - ClassNotFoundException, InterruptedException { + String output, Configuration config) + throws IOException, ClassNotFoundException, InterruptedException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); @@ -799,9 +896,9 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } Path outFolder = new Path(output); - Path tempDir = new Path(config.get("mapreduce.cluster.temp.dir", ".") - + "/readdb-topN-temp-" - + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + Path tempDir = new Path( + config.get("mapreduce.cluster.temp.dir", ".") + "/readdb-topN-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Job job = NutchJob.getInstance(config); job.setJobName("topN prepare " + crawlDb); @@ -818,9 +915,9 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setOutputValueClass(Text.class); job.getConfiguration().setFloat("db.reader.topn.min", min); - - FileSystem fs = tempDir.getFileSystem(config); - try{ + + FileSystem fs = tempDir.getFileSystem(config); + try { boolean success = job.waitForCompletion(true); if (!success) { String message = "CrawlDbReader job did not succeed, job status:" @@ -856,7 +953,7 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setNumReduceTasks(1); // create a single file. - try{ + try { boolean success = job.waitForCompletion(true); if (!success) { String message = "CrawlDbReader job did not succeed, job status:" @@ -879,35 +976,38 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } - - public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, Exception { + public int run(String[] args) throws IOException, InterruptedException, + ClassNotFoundException, Exception { @SuppressWarnings("resource") CrawlDbReader dbr = new CrawlDbReader(); if (args.length < 2) { - System.err - .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); + System.err.println( + "Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); System.err .println("\t<crawldb>\tdirectory name where crawldb is located"); System.err .println("\t-stats [-sort] \tprint overall statistics to System.out"); System.err.println("\t\t[-sort]\tlist status sorted by host"); - System.err - .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the whole db to a text file in <out_dir>"); + System.err.println( + "\t-dump <out_dir> [-format normal|csv|crawldb|json]\tdump the whole db to a text file in <out_dir>"); System.err.println("\t\t[-format csv]\tdump in Csv format"); - System.err - .println("\t\t[-format normal]\tdump in standard format (default option)"); + System.err.println( + "\t\t[-format normal]\tdump in standard format (default option)"); System.err.println("\t\t[-format crawldb]\tdump as CrawlDB"); + System.err.println("\t\t[-format json]\tdump in JSON Lines format"); System.err.println("\t\t[-regex <expr>]\tfilter records with expression"); System.err.println("\t\t[-retry <num>]\tminimum retry count"); - System.err - .println("\t\t[-status <status>]\tfilter records by CrawlDatum status"); - System.err.println("\t\t[-expr <expr>]\tJexl expression to evaluate for this record"); - System.err.println("\t\t[-sample <fraction>]\tOnly process a random sample with this ratio"); + System.err.println( + "\t\t[-status <status>]\tfilter records by CrawlDatum status"); + System.err.println( + "\t\t[-expr <expr>]\tJexl expression to evaluate for this record"); + System.err.println( + "\t\t[-sample <fraction>]\tOnly process a random sample with this ratio"); System.err .println("\t-url <url>\tprint information on <url> to System.out"); - System.err - .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); + System.err.println( + "\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); System.err .println("\t\t[<min>]\tskip records with scores below this value."); System.err.println("\t\t\tThis can significantly improve performance."); @@ -954,14 +1054,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } if (args[j].equals("-expr")) { expr = args[++j]; - i=i+2; + i = i + 2; } if (args[j].equals("-sample")) { sample = Float.parseFloat(args[++j]); i = i + 2; } } - dbr.processDumpJob(crawlDb, param, config, format, regex, status, retry, expr, sample); + dbr.processDumpJob(crawlDb, param, config, format, regex, status, retry, + expr, sample); } else if (args[i].equals("-url")) { param = args[++i]; StringBuilder output = new StringBuilder(); @@ -990,25 +1091,27 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } return 0; } - + public static void main(String[] args) throws Exception { int result = ToolRunner.run(NutchConfiguration.create(), new CrawlDbReader(), args); System.exit(result); } - public Object query(Map<String, String> args, Configuration conf, String type, String crawlId) throws Exception { + public Object query(Map<String, String> args, Configuration conf, String type, + String crawlId) throws Exception { Map<String, Object> results = new HashMap<>(); String crawlDb = crawlId + "/crawldb"; - if(type.equalsIgnoreCase("stats")){ + if (type.equalsIgnoreCase("stats")) { boolean sort = false; - if(args.containsKey("sort")){ - if(args.get("sort").equalsIgnoreCase("true")) + if (args.containsKey("sort")) { + if (args.get("sort").equalsIgnoreCase("true")) sort = true; } - TreeMap<String , Writable> stats = processStatJobHelper(crawlDb, NutchConfiguration.create(), sort); + TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, + NutchConfiguration.create(), sort); LongWritable totalCnt = (LongWritable) stats.get("T"); stats.remove("T"); results.put("totalUrls", String.valueOf(totalCnt.get())); @@ -1034,14 +1137,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } else if (k.startsWith("status")) { String[] st = k.split(" "); int code = Integer.parseInt(st[1]); - if (st.length > 2){ + if (st.length > 2) { @SuppressWarnings("unchecked") - Map<String, Object> individualStatusInfo = (Map<String, Object>) statusMap.get(String.valueOf(code)); + Map<String, Object> individualStatusInfo = (Map<String, Object>) statusMap + .get(String.valueOf(code)); Map<String, String> hostValues; - if(individualStatusInfo.containsKey("hostValues")){ - hostValues= (Map<String, String>) individualStatusInfo.get("hostValues"); - } - else{ + if (individualStatusInfo.containsKey("hostValues")) { + hostValues = (Map<String, String>) individualStatusInfo + .get("hostValues"); + } else { hostValues = new HashMap<>(); individualStatusInfo.put("hostValues", hostValues); } @@ -1049,7 +1153,8 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { } else { Map<String, Object> individualStatusInfo = new HashMap<>(); - individualStatusInfo.put("statusValue", CrawlDatum.getStatusName((byte) code)); + individualStatusInfo.put("statusValue", + CrawlDatum.getStatusName((byte) code)); individualStatusInfo.put("count", String.valueOf(val)); statusMap.put(String.valueOf(code), individualStatusInfo); @@ -1061,7 +1166,7 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { results.put("status", statusMap); return results; } - if(type.equalsIgnoreCase("dump")){ + if (type.equalsIgnoreCase("dump")) { String output = args.get("out_dir"); String format = "normal"; String regex = null; @@ -1085,25 +1190,26 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { expr = args.get("expr"); } if (args.containsKey("sample")) { - sample = Float.parseFloat(args.get("sample")); - } - processDumpJob(crawlDb, output, conf, format, regex, status, retry, expr, sample); - File dumpFile = new File(output+"/part-00000"); - return dumpFile; + sample = Float.parseFloat(args.get("sample")); + } + processDumpJob(crawlDb, output, conf, format, regex, status, retry, expr, + sample); + File dumpFile = new File(output + "/part-00000"); + return dumpFile; } if (type.equalsIgnoreCase("topN")) { String output = args.get("out_dir"); long topN = Long.parseLong(args.get("nnn")); float min = 0.0f; - if(args.containsKey("min")){ + if (args.containsKey("min")) { min = Float.parseFloat(args.get("min")); } processTopNJob(crawlDb, topN, min, output, conf); - File dumpFile = new File(output+"/part-00000"); + File dumpFile = new File(output + "/part-00000"); return dumpFile; } - if(type.equalsIgnoreCase("url")){ + if (type.equalsIgnoreCase("url")) { String url = args.get("url"); CrawlDatum res = get(crawlDb, url, conf); results.put("status", res.getStatus()); @@ -1114,9 +1220,10 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { results.put("score", res.getScore()); results.put("signature", StringUtil.toHexString(res.getSignature())); Map<String, String> metadata = new HashMap<>(); - if(res.getMetaData()!=null){ + if (res.getMetaData() != null) { for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) { - metadata.put(String.valueOf(e.getKey()), String.valueOf(e.getValue())); + metadata.put(String.valueOf(e.getKey()), + String.valueOf(e.getValue())); } } results.put("metadata", metadata);