This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 5076430 NUTCH-2773 SegmentReader (-dump or -get): show HTML content as UTF-8 - if called with command-line flag `-recode` (or if property `segment.reader.content.recode` is true): try to recode the HTML page content to UTF-8 using the already detected charset - fix passing forward properties (-Dprop=val) to Hadoop job/tasks * always use same Hadoop Configuration * use single instance of SegmentReader for -get and -list * remove duplicating member and local variables new e9dd180 Merge pull request #501 from sebastian-nagel/NUTCH-2773-segment-reader-recode-html 5076430 is described below commit 50764304870517217f46940ca4cdb69ed37cfb58 Author: Sebastian Nagel <sna...@apache.org> AuthorDate: Fri Feb 22 22:54:46 2019 +0100 NUTCH-2773 SegmentReader (-dump or -get): show HTML content as UTF-8 - if called with command-line flag `-recode` (or if property `segment.reader.content.recode` is true): try to recode the HTML page content to UTF-8 using the already detected charset - fix passing forward properties (-Dprop=val) to Hadoop job/tasks * always use same Hadoop Configuration * use single instance of SegmentReader for -get and -list * remove duplicating member and local variables --- conf/nutch-default.xml | 13 ++ src/java/org/apache/nutch/protocol/Content.java | 18 ++- .../org/apache/nutch/segment/SegmentReader.java | 143 ++++++++++++--------- 3 files changed, 109 insertions(+), 65 deletions(-) diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index 58db620..85d9933 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -1209,6 +1209,19 @@ </description> </property> +<!-- SegmentReader --> +<property> + <name>segment.reader.content.recode</name> + <value>false</value> + <description> + SegmentReader when dumping segments: If true try to recode content + of HTML documents from the original encoding to UTF-8. Note, this + property can be overwritten by SegmentReader command-line options. + </description> +</property> + + + <!-- any23 plugin properties --> <property> diff --git a/src/java/org/apache/nutch/protocol/Content.java b/src/java/org/apache/nutch/protocol/Content.java index c513159..e7016f0 100644 --- a/src/java/org/apache/nutch/protocol/Content.java +++ b/src/java/org/apache/nutch/protocol/Content.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.zip.InflaterInputStream; @@ -256,6 +258,20 @@ public final class Content implements Writable { } public String toString() { + return toString(StandardCharsets.UTF_8); + } + + public String toString(String charset) { + Charset c = StandardCharsets.UTF_8; + try { + c = Charset.forName(charset); + } catch(Exception e) { + // fall-back to utf-8 + }; + return toString(c); + } + + public String toString(Charset charset) { StringBuffer buffer = new StringBuffer(); buffer.append("Version: " + version + "\n"); @@ -264,7 +280,7 @@ public final class Content implements Writable { buffer.append("contentType: " + contentType + "\n"); buffer.append("metadata: " + metadata + "\n"); buffer.append("Content:\n"); - buffer.append(new String(content)); // try default encoding + buffer.append(new String(content, charset)); return buffer.toString(); diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java b/src/java/org/apache/nutch/segment/SegmentReader.java index bcf99b8..f47a76d 100644 --- a/src/java/org/apache/nutch/segment/SegmentReader.java +++ b/src/java/org/apache/nutch/segment/SegmentReader.java @@ -25,6 +25,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.Writer; import java.lang.invoke.MethodHandles; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -61,11 +62,13 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.metadata.Metadata; import org.apache.nutch.parse.ParseData; import org.apache.nutch.parse.ParseText; import org.apache.nutch.protocol.Content; import org.apache.nutch.util.HadoopFSUtil; import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.SegmentReaderUtil; /** Dump the content of a segment. */ @@ -74,12 +77,13 @@ public class SegmentReader extends Configured implements Tool { private static final Logger LOG = LoggerFactory .getLogger(MethodHandles.lookup().lookupClass()); - private boolean co; - private boolean fe; - private boolean ge; - private boolean pa; - private boolean pd; - private boolean pt; + private boolean co = true; + private boolean fe = true; + private boolean ge = true; + private boolean pa = true; + private boolean pd = true; + private boolean pt = true; + private boolean recodeContent = false; public static class InputCompatMapper extends Mapper<WritableComparable<?>, Writable, Text, NutchWritable> { @@ -103,6 +107,8 @@ public class SegmentReader extends Configured implements Tool { /** Implements a text output format */ public static class TextOutputFormat extends FileOutputFormat<WritableComparable<?>, Writable> { + + @Override public RecordWriter<WritableComparable<?>, Writable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { String name = getUniqueFile(context, "part", ""); @@ -119,11 +125,14 @@ public class SegmentReader extends Configured implements Tool { final PrintStream printStream = new PrintStream( fs.create(segmentDumpFile), false, StandardCharsets.UTF_8.name()); return new RecordWriter<WritableComparable<?>, Writable>() { + + @Override public synchronized void write(WritableComparable<?> key, Writable value) throws IOException { printStream.println(value); } + @Override public synchronized void close(TaskAttemptContext context) throws IOException { printStream.close(); } @@ -131,38 +140,17 @@ public class SegmentReader extends Configured implements Tool { } } - public SegmentReader() { - super(null); - } - - public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, - boolean pa, boolean pd, boolean pt) { - super(conf); - this.co = co; - this.fe = fe; - this.ge = ge; - this.pa = pa; - this.pd = pd; - this.pt = pt; - } - - public void setup(Job job) { - Configuration conf = job.getConfiguration(); - this.co = conf.getBoolean("segment.reader.co", true); - this.fe = conf.getBoolean("segment.reader.fe", true); - this.ge = conf.getBoolean("segment.reader.ge", true); - this.pa = conf.getBoolean("segment.reader.pa", true); - this.pd = conf.getBoolean("segment.reader.pd", true); - this.pt = conf.getBoolean("segment.reader.pt", true); - } - - public void close() { - } - public static class InputCompatReducer extends Reducer<Text, NutchWritable, Text, Text> { private long recNo = 0L; + private boolean recodeContent = false; + + @Override + public void setup(Context context) { + recodeContent = context.getConfiguration() + .getBoolean("segment.reader.content.recode", false); + } @Override public void reduce(Text key, Iterable<NutchWritable> values, @@ -171,20 +159,32 @@ public class SegmentReader extends Configured implements Tool { dump.append("\nRecno:: ").append(recNo++).append("\n"); dump.append("URL:: " + key.toString() + "\n"); + Content content = null; + Charset charset = null; for (NutchWritable val : values) { Writable value = val.get(); // unwrap if (value instanceof CrawlDatum) { dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString()); } else if (value instanceof Content) { - dump.append("\nContent::\n").append(((Content) value).toString()); + if (recodeContent) { + content = (Content) value; + } else { + dump.append("\nContent::\n").append(((Content) value).toString()); + } } else if (value instanceof ParseData) { dump.append("\nParseData::\n").append(((ParseData) value).toString()); + if (recodeContent) { + charset = getCharset(((ParseData) value).getParseMeta()); + } } else if (value instanceof ParseText) { dump.append("\nParseText::\n").append(((ParseText) value).toString()); } else if (LOG.isWarnEnabled()) { LOG.warn("Unrecognized type: " + value.getClass()); } } + if (recodeContent && content != null) { + dump.append("\nContent::\n").append(content.toString(charset)); + } context.write(key, new Text(dump.toString())); } } @@ -192,11 +192,9 @@ public class SegmentReader extends Configured implements Tool { public void dump(Path segment, Path output) throws IOException, InterruptedException, ClassNotFoundException { - if (LOG.isInfoEnabled()) { - LOG.info("SegmentReader: dump segment: " + segment); - } + LOG.info("SegmentReader: dump segment: {}", segment); - Job job = Job.getInstance(); + Job job = NutchJob.getInstance(getConf()); job.setJobName("read " + segment); Configuration conf = job.getConfiguration(); @@ -277,9 +275,7 @@ public class SegmentReader extends Configured implements Tool { } } fs.delete(tempDir, true); - if (LOG.isInfoEnabled()) { - LOG.info("SegmentReader: done"); - } + LOG.info("SegmentReader: done"); } /** Appends two files and updates the Recno counter */ @@ -306,7 +302,7 @@ public class SegmentReader extends Configured implements Tool { public void get(final Path segment, final Text key, Writer writer, final Map<String, List<Writable>> results) throws Exception { - LOG.info("SegmentReader: get '" + key + "'"); + LOG.info("SegmentReader: get '{}'", key); ArrayList<Thread> threads = new ArrayList<>(); if (co) threads.add(new Thread() { @@ -405,7 +401,13 @@ public class SegmentReader extends Configured implements Tool { if (res != null && res.size() > 0) { for (int k = 0; k < res.size(); k++) { writer.write(keys[i][1]); - writer.write(res.get(k) + "\n"); + if (recodeContent && keys[i][0].equals("co")) { + Charset charset = getCharset(((ParseData) results.get("pd").get(k)).getParseMeta()); + writer.write(((Content) res.get(k)).toString(charset)); + } else { + writer.write(res.get(k).toString()); + } + writer.write('\n'); } } writer.flush(); @@ -459,6 +461,22 @@ public class SegmentReader extends Configured implements Tool { return res; } + /** Try to get HTML encoding from parse metadata */ + public static Charset getCharset(Metadata parseMeta) { + Charset cs = StandardCharsets.UTF_8; + String charset = parseMeta.get(Metadata.CHAR_ENCODING_FOR_CONVERSION); + if (charset == null) { + // fall-back: "Content-Encoding" (set by parse-tika) + charset = parseMeta.get(Metadata.CONTENT_ENCODING); + } + try { + cs = Charset.forName(charset); + } catch (Exception e) { + // fall-back to utf-8 + } + return cs; + } + public static class SegmentReaderStats { public long start = -1L; public long end = -1L; @@ -579,6 +597,7 @@ public class SegmentReader extends Configured implements Tool { private static final int MODE_GET = 2; + @Override public int run(String[] args) throws Exception { if (args.length < 2) { usage(); @@ -592,12 +611,6 @@ public class SegmentReader extends Configured implements Tool { else if (args[0].equals("-get")) mode = MODE_GET; - boolean co = true; - boolean fe = true; - boolean ge = true; - boolean pa = true; - boolean pd = true; - boolean pt = true; // collect general options for (int i = 1; i < args.length; i++) { if (args[i].equals("-nocontent")) { @@ -618,22 +631,21 @@ public class SegmentReader extends Configured implements Tool { } else if (args[i].equals("-noparsetext")) { pt = false; args[i] = null; + } else if (args[i].equals("-recode")) { + recodeContent = true; + args[i] = null; } } - Configuration conf = NutchConfiguration.create(); - SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, - pt); + + if (recodeContent) { + LOG.info("Recoding charset of HTML content"); + getConf().setBoolean("segment.reader.content.recode", true); + } + // collect required args switch (mode) { case MODE_DUMP: - this.co = co; - this.fe = fe; - this.ge = ge; - this.pa = pa; - this.pd = pd; - this.pt = pt; - String input = args[1]; if (input == null) { System.err.println("Missing required argument: <segment_dir>"); @@ -655,7 +667,7 @@ public class SegmentReader extends Configured implements Tool { continue; if (args[i].equals("-dir")) { Path dir = new Path(args[++i]); - FileSystem fs = dir.getFileSystem(conf); + FileSystem fs = dir.getFileSystem(getConf()); FileStatus[] fstats = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs)); Path[] files = HadoopFSUtil.getPaths(fstats); @@ -665,7 +677,7 @@ public class SegmentReader extends Configured implements Tool { } else dirs.add(new Path(args[i])); } - segmentReader.list(dirs, new OutputStreamWriter(System.out, StandardCharsets.UTF_8)); + list(dirs, new OutputStreamWriter(System.out, StandardCharsets.UTF_8)); return 0; case MODE_GET: input = args[1]; @@ -680,8 +692,9 @@ public class SegmentReader extends Configured implements Tool { usage(); return -1; } - segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter( - System.out, StandardCharsets.UTF_8), new HashMap<>()); + get(new Path(input), new Text(key), + new OutputStreamWriter(System.out, StandardCharsets.UTF_8), + new HashMap<>()); return 0; default: System.err.println("Invalid operation: " + args[0]); @@ -700,6 +713,8 @@ public class SegmentReader extends Configured implements Tool { System.err.println("\t-noparse\tignore crawl_parse directory"); System.err.println("\t-noparsedata\tignore parse_data directory"); System.err.println("\t-noparsetext\tignore parse_text directory"); + System.err.println("\t-recode \ttry to recode HTML content from the page's\n" + + "\t \toriginal charset to UTF-8\n"); System.err.println(); System.err .println("* SegmentReader -dump <segment_dir> <output> [general options]");