Author: dogacan Date: Tue Jul 17 08:16:40 2007 New Revision: 556946 URL: http://svn.apache.org/viewvc?view=rev&rev=556946 Log: NUTCH-506 - Delegate compression to Hadoop.
Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Tue Jul 17 08:16:40 2007 @@ -89,6 +89,8 @@ 30. NUTCH-515 - Next fetch time is set incorrectly. (dogacan) +30. NUTCH-506 - Nutch should delegate compression to Hadoop. (dogacan) + Release 0.9 - 2007-04-02 1. Changed log4j confiquration to log to stdout on commandline Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Tue Jul 17 08:16:40 2007 @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; @@ -57,10 +58,12 @@ new Path(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME), name); final Path content = new Path(new Path(job.getOutputPath(), Content.DIR_NAME), name); + + final CompressionType compType = SequenceFile.getCompressionType(job); final MapFile.Writer fetchOut = new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class, - CompressionType.NONE, progress); + compType, progress); return new RecordWriter() { private MapFile.Writer contentOut; @@ -70,7 +73,7 @@ if (Fetcher.isStoringContent(job)) { contentOut = new MapFile.Writer(job, fs, content.toString(), Text.class, Content.class, - CompressionType.NONE, progress); + compType, progress); } if (Fetcher.isParsing(job)) { Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java Tue Jul 17 08:16:40 2007 @@ -85,6 +85,7 @@ final float interval = job.getFloat("db.fetch.interval.default", 2592000.0f); final boolean ignoreExternalLinks = job.getBoolean("db.ignore.external.links", false); final int maxOutlinks = job.getInt("db.max.outlinks.per.page", 100); + final CompressionType compType = SequenceFile.getCompressionType(job); Path text = new Path(new Path(job.getOutputPath(), ParseText.DIR_NAME), name); @@ -99,11 +100,11 @@ final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, - CompressionType.RECORD, progress); + compType, progress); final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, - CompressionType.NONE, progress); + compType, progress); return new RecordWriter() { Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java Tue Jul 17 08:16:40 2007 @@ -69,7 +69,6 @@ key = newKey; } Content content = (Content) value; - content.forceInflate(); ParseResult parseResult = null; try { Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java Tue Jul 17 08:16:40 2007 @@ -25,7 +25,8 @@ import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.io.VersionedWritable; +import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.conf.Configuration; @@ -35,9 +36,9 @@ /** * @author Andrzej Bialecki <[EMAIL PROTECTED]> */ -public class ParseStatus extends VersionedWritable { +public class ParseStatus implements Writable { - private final static byte VERSION = 1; + private final static byte VERSION = 2; // Primary status codes: @@ -136,17 +137,32 @@ } public void readFields(DataInput in) throws IOException { - super.readFields(in); // check version - majorCode = in.readByte(); - minorCode = in.readShort(); - args = WritableUtils.readCompressedStringArray(in); - } + byte version = in.readByte(); + switch(version) { + case 1: + majorCode = in.readByte(); + minorCode = in.readShort(); + args = WritableUtils.readCompressedStringArray(in); + break; + case 2: + majorCode = in.readByte(); + minorCode = in.readShort(); + args = WritableUtils.readStringArray(in); + break; + default: + throw new VersionMismatchException(VERSION, version); + } + } public void write(DataOutput out) throws IOException { - super.write(out); // write out version + out.writeByte(VERSION); out.writeByte(majorCode); out.writeShort(minorCode); - WritableUtils.writeCompressedStringArray(out, args); + if (args == null) { + out.writeInt(-1); + } else { + WritableUtils.writeStringArray(out, args); + } } /** A convenience method. Returns true if majorCode is SUCCESS, false Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java Tue Jul 17 08:16:40 2007 @@ -26,10 +26,10 @@ /* The text conversion of page's content, stored using gzip compression. * @see Parse#getText() */ -public final class ParseText extends VersionedWritable { +public final class ParseText implements Writable { public static final String DIR_NAME = "parse_text"; - private final static byte VERSION = 1; + private final static byte VERSION = 2; public ParseText() {} private String text; @@ -38,18 +38,23 @@ this.text = text; } - public byte getVersion() { return VERSION; } - public void readFields(DataInput in) throws IOException { - super.readFields(in); // check version - text = WritableUtils.readCompressedString(in); - return; + byte version = in.readByte(); + switch (version) { + case 1: + text = WritableUtils.readCompressedString(in); + break; + case VERSION: + text = Text.readString(in); + break; + default: + throw new VersionMismatchException(VERSION, version); + } } public final void write(DataOutput out) throws IOException { - super.write(out); // write version - WritableUtils.writeCompressedString(out, text); - return; + out.write(VERSION); + Text.writeString(out, text); } public final static ParseText read(DataInput in) throws IOException { Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Tue Jul 17 08:16:40 2007 @@ -17,32 +17,35 @@ package org.apache.nutch.protocol; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.zip.InflaterInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayFile; -import org.apache.hadoop.io.CompressedWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.mime.MimeType; import org.apache.nutch.util.mime.MimeTypeException; import org.apache.nutch.util.mime.MimeTypes; -public final class Content extends CompressedWritable { +public final class Content implements Writable{ public static final String DIR_NAME = "content"; - private final static byte VERSION = 2; + private final static int VERSION = -1; - private byte version; + private int version; private String url; @@ -58,10 +61,8 @@ private MimeTypes mimeTypes; - private boolean inflated; - public Content() { - inflated = false; + metadata = new Metadata(); } public Content(String url, String base, byte[] content, String contentType, @@ -83,21 +84,11 @@ this.mimeTypeMagic = conf.getBoolean("mime.type.magic", true); this.mimeTypes = MimeTypes.get(conf.get("mime.types.file")); this.contentType = getContentType(contentType, url, content); - inflated = true; } - public void ensureInflated() { - if (inflated) { - return; - } - super.ensureInflated(); - inflated = true; - } - - protected final void readFieldsCompressed(DataInput in) throws IOException { - version = in.readByte(); - metadata = new Metadata(); - switch (version) { + private final void readFieldsCompressed(DataInput in) throws IOException { + byte oldVersion = in.readByte(); + switch (oldVersion) { case 0: case 1: url = UTF8.readString(in); // read url @@ -118,7 +109,7 @@ } } break; - case VERSION: + case 2: url = Text.readString(in); // read url base = Text.readString(in); // read base @@ -129,13 +120,41 @@ metadata.readFields(in); // read meta data break; default: - throw new VersionMismatchException(VERSION, version); + throw new VersionMismatchException((byte)2, oldVersion); } } + + public final void readFields(DataInput in) throws IOException { + int sizeOrVersion = in.readInt(); + if (sizeOrVersion < 0) { // version + version = sizeOrVersion; + switch (version) { + case VERSION: + url = Text.readString(in); + base = Text.readString(in); + + content = new byte[in.readInt()]; + in.readFully(content); + + contentType = Text.readString(in); + metadata.readFields(in); + break; + default: + throw new VersionMismatchException((byte)VERSION, (byte)version); + } + } else { // size + byte[] compressed = new byte[sizeOrVersion]; + in.readFully(compressed, 0, compressed.length); + ByteArrayInputStream deflated = new ByteArrayInputStream(compressed); + DataInput inflater = + new DataInputStream(new InflaterInputStream(deflated)); + readFieldsCompressed(inflater); + } + } - protected final void writeCompressed(DataOutput out) throws IOException { - out.writeByte(VERSION); + public final void write(DataOutput out) throws IOException { + out.writeInt(VERSION); Text.writeString(out, url); // write url Text.writeString(out, base); // write base @@ -160,7 +179,6 @@ /** The url fetched. */ public String getUrl() { - ensureInflated(); return url; } @@ -168,18 +186,15 @@ * Maybe be different from url if the request redirected. */ public String getBaseUrl() { - ensureInflated(); return base; } /** The binary content retrieved. */ public byte[] getContent() { - ensureInflated(); return content; } public void setContent(byte[] content) { - ensureInflated(); this.content = content; } @@ -188,34 +203,28 @@ * http://www.iana.org/assignments/media-types/</a> */ public String getContentType() { - ensureInflated(); return contentType; } public void setContentType(String contentType) { - ensureInflated(); this.contentType = contentType; } /** Other protocol-specific data. */ public Metadata getMetadata() { - ensureInflated(); return metadata; } /** Other protocol-specific data. */ public void setMetadata(Metadata metadata) { - ensureInflated(); this.metadata = metadata; } public boolean equals(Object o) { - ensureInflated(); if (!(o instanceof Content)) { return false; } Content that = (Content) o; - that.ensureInflated(); return this.url.equals(that.url) && this.base.equals(that.base) && Arrays.equals(this.getContent(), that.getContent()) && this.contentType.equals(that.contentType) @@ -223,7 +232,6 @@ } public String toString() { - ensureInflated(); StringBuffer buffer = new StringBuffer(); buffer.append("Version: " + version + "\n"); @@ -296,13 +304,4 @@ } return typeName; } - - /** - * By calling this method caller forces the next access to any property (via - * getters and setters) to check if decompressing of data is really required. - */ - public void forceInflate() { - inflated = false; - } - } Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java Tue Jul 17 08:16:40 2007 @@ -22,15 +22,16 @@ import java.io.IOException; import java.util.HashMap; -import org.apache.hadoop.io.VersionedWritable; +import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; /** * @author Andrzej Bialecki */ -public class ProtocolStatus extends VersionedWritable { +public class ProtocolStatus implements Writable { - private final static byte VERSION = 1; + private final static byte VERSION = 2; /** Content was retrieved without errors. */ public static final int SUCCESS = 1; @@ -110,10 +111,6 @@ } - public byte getVersion() { - return VERSION; - } - public ProtocolStatus(int code, String[] args) { this.code = code; this.args = args; @@ -154,17 +151,32 @@ } public void readFields(DataInput in) throws IOException { - super.readFields(in); // check version - code = in.readByte(); - lastModified = in.readLong(); - args = WritableUtils.readCompressedStringArray(in); + byte version = in.readByte(); + switch(version) { + case 1: + code = in.readByte(); + lastModified = in.readLong(); + args = WritableUtils.readCompressedStringArray(in); + break; + case VERSION: + code = in.readByte(); + lastModified = in.readLong(); + args = WritableUtils.readStringArray(in); + break; + default: + throw new VersionMismatchException(VERSION, version); + } } public void write(DataOutput out) throws IOException { - super.write(out); // write version + out.writeByte(VERSION); out.writeByte((byte)code); out.writeLong(lastModified); - WritableUtils.writeCompressedStringArray(out, args); + if (args == null) { + out.writeInt(-1); + } else { + WritableUtils.writeStringArray(out, args); + } } public void setArgs(String[] args) { Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=556946&r1=556945&r2=556946 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue Jul 17 08:16:40 2007 @@ -238,13 +238,15 @@ } else { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } - res = new SequenceFile.Writer(fs, job, wname, Text.class, CrawlDatum.class, progress, new SequenceFile.Metadata()); + res = SequenceFile.createWriter(fs, job, wname, Text.class, + CrawlDatum.class, + SequenceFile.getCompressionType(job), progress); sliceWriters.put(slice + dirName, res); return res; } // lazily create MapFile-s. - private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException { + private MapFile.Writer ensureMapFile(String slice, String dirName, Class<? extends Writable> clazz) throws IOException { if (slice == null) slice = DEFAULT_SLICE; MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName); if (res != null) return res; @@ -254,7 +256,11 @@ } else { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } - res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, CompressionType.RECORD, progress); + CompressionType compType = SequenceFile.getCompressionType(job); + if (clazz.isAssignableFrom(ParseText.class)) { + compType = CompressionType.RECORD; + } + res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, compType, progress); sliceWriters.put(slice + dirName, res); return res; }