Author: cutting Date: Fri Oct 7 15:16:27 2005 New Revision: 307203 URL: http://svn.apache.org/viewcvs?rev=307203&view=rev Log: First working version of MapReduce-based dedup.
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java Modified: lucene/nutch/branches/mapred/bin/nutch lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java Modified: lucene/nutch/branches/mapred/bin/nutch URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/bin/nutch?rev=307203&r1=307202&r2=307203&view=diff ============================================================================== --- lucene/nutch/branches/mapred/bin/nutch (original) +++ lucene/nutch/branches/mapred/bin/nutch Fri Oct 7 15:16:27 2005 @@ -138,6 +138,8 @@ CLASS=org.apache.nutch.crawl.LinkDb elif [ "$COMMAND" = "index" ] ; then CLASS=org.apache.nutch.crawl.Indexer +elif [ "$COMMAND" = "dedup" ] ; then + CLASS=org.apache.nutch.crawl.DeleteDuplicates elif [ "$COMMAND" = "merge" ] ; then CLASS=org.apache.nutch.indexer.IndexMerger elif [ "$COMMAND" = "server" ] ; then Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java?rev=307203&r1=307202&r2=307203&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Crawl.java Fri Oct 7 15:16:27 2005 @@ -109,8 +109,9 @@ new LinkDb(conf).invert(linkDb, segments); // invert links - // index + // index & dedup new Indexer(conf).index(index, linkDb, fs.listFiles(segments)); + new DeleteDuplicates(conf).dedup(new File[] { index }); LOG.info("crawl finished: " + dir); } Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java?rev=307203&view=auto ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java (added) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java Fri Oct 7 15:16:27 2005 @@ -0,0 +1,338 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import java.security.*; +import java.text.*; +import java.util.*; +import java.util.logging.*; + +import org.apache.nutch.io.*; +import org.apache.nutch.fs.*; +import org.apache.nutch.util.*; +import org.apache.nutch.mapred.*; +import org.apache.nutch.indexer.*; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.document.Document; + +/****************************************************************** + * Deletes duplicate documents in a set of Lucene indexes. + * Duplicates have either the same contents (via MD5 hash) or the same URL. + ******************************************************************/ +public class DeleteDuplicates extends NutchConfigured + implements Mapper, OutputFormat { + private static final Logger LOG = + LogFormatter.getLogger("org.apache.nutch.crawl.DeleteDuplicates"); + +// Algorithm: +// +// 1. map indexes -> <<md5, score, urlLen>, <index,doc>> +// partition by md5 +// reduce, deleting all but largest score w/ shortest url +// +// 2. map indexes -> <<url, fetchdate>, <index,doc>> +// partition by url +// reduce, deleting all but most recent. +// +// Part 2 is not yet implemented, but the Indexer currently only indexes one +// URL per page, so this is not a critical problem. + + public static class IndexDoc implements WritableComparable { + private UTF8 index; // the segment index + private int doc; // within the index + + public void write(DataOutput out) throws IOException { + index.write(out); + out.writeInt(doc); + } + + public void readFields(DataInput in) throws IOException { + if (index == null) { + index = new UTF8(); + } + index.readFields(in); + this.doc = in.readInt(); + } + + public int compareTo(Object o) { + IndexDoc that = (IndexDoc)o; + int indexCompare = this.index.compareTo(that.index); + if (indexCompare != 0) { // prefer later indexes + return indexCompare; + } else { + return this.doc - that.doc; // prefer later docs + } + } + + public boolean equals(Object o) { + IndexDoc that = (IndexDoc)o; + return this.index.equals(that.index) && this.doc == that.doc; + } + + } + + public static class HashScore implements WritableComparable { + private MD5Hash hash; + private float score; + private int urlLen; + + public void write(DataOutput out) throws IOException { + hash.write(out); + out.writeFloat(score); + out.writeInt(urlLen); + } + + public void readFields(DataInput in) throws IOException { + if (hash == null) { + hash = new MD5Hash(); + } + hash.readFields(in); + score = in.readFloat(); + urlLen = in.readInt(); + } + + public int compareTo(Object o) { + HashScore that = (HashScore)o; + if (!this.hash.equals(that.hash)) { // order first by hash + return this.hash.compareTo(that.hash); + } else if (this.score != that.score) { // prefer larger scores + return this.score < that.score ? 1 : -1 ; + } else { // prefer shorter urls + return this.urlLen - that.urlLen; + } + } + + public boolean equals(Object o) { + HashScore that = (HashScore)o; + return this.hash.equals(that.hash) + && this.score == that.score + && this.urlLen == that.urlLen; + } + } + + public static class InputFormat extends InputFormatBase { + private static final int INDEX_LENGTH = Integer.MAX_VALUE; + + /** Return each index as a split. */ + public FileSplit[] getSplits(NutchFileSystem fs, JobConf job, + int numSplits) + throws IOException { + File[] files = listFiles(fs, job); + FileSplit[] splits = new FileSplit[files.length]; + for (int i = 0; i < files.length; i++) { + splits[i] = new FileSplit(files[i], 0, INDEX_LENGTH); + } + return splits; + } + + /** Return each index as a split. */ + public RecordReader getRecordReader(final NutchFileSystem fs, + final FileSplit split, + final JobConf job, + Reporter reporter) throws IOException { + final UTF8 index = new UTF8(split.getFile().toString()); + reporter.setStatus(index.toString()); + return new RecordReader() { + + private IndexReader indexReader = + IndexReader.open(new NdfsDirectory(fs, split.getFile(), false)); + + { indexReader.undeleteAll(); } + + private final int maxDoc = indexReader.maxDoc(); + private int doc; + + public boolean next(Writable key, Writable value) + throws IOException { + + if (doc >= maxDoc) + return false; + + Document document = indexReader.document(doc); + + // fill in key + if (key instanceof UTF8) { + ((UTF8)key).set(document.get("url")); + } else { + HashScore hashScore = (HashScore)key; + if (hashScore.hash == null) { + hashScore.hash = new MD5Hash(); + } + hashScore.hash.setDigest(document.get("digest")); + hashScore.score = Float.parseFloat(document.get("boost")); + hashScore.urlLen = document.get("url").length(); + } + + // fill in value + IndexDoc indexDoc = (IndexDoc)value; + if (indexDoc.index == null) { + indexDoc.index = new UTF8(); + } + indexDoc.index.set(index); + indexDoc.doc = doc; + + doc++; + + return true; + } + + public long getPos() throws IOException { + return (doc*INDEX_LENGTH)/maxDoc; + } + + public void close() throws IOException { + indexReader.close(); + } + }; + } + } + + public static class HashPartitioner implements Partitioner { + public void configure(JobConf job) {} + public int getPartition(WritableComparable key, Writable value, + int numReduceTasks) { + int hashCode = ((HashScore)key).hash.hashCode(); + return (hashCode & Integer.MAX_VALUE) % numReduceTasks; + } + } + + public static class HashReducer implements Reducer { + private MD5Hash prevHash = new MD5Hash(); + public void configure(JobConf job) {} + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + MD5Hash hash = ((HashScore)key).hash; + while (values.hasNext()) { + Writable value = (Writable)values.next(); + if (hash.equals(prevHash)) { // collect all but first + output.collect(key, value); + } else { + prevHash.set(hash); + } + } + } + } + + public DeleteDuplicates() { super(null); } + + public DeleteDuplicates(NutchConf conf) { super(conf); } + + public void configure(JobConf job) {} + + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) + throws IOException { + IndexDoc indexDoc = (IndexDoc)value; + output.collect(indexDoc.index, new IntWritable(indexDoc.doc)); + } + + private HashMap readers = new HashMap(); + public RecordWriter getRecordWriter(final NutchFileSystem fs, + final JobConf job, + final String name) throws IOException { + return new RecordWriter() { + /** Delete value from index named in key. */ + public void write(WritableComparable key, Writable value) + throws IOException { + IndexReader reader = (IndexReader)readers.get(key); + if (reader == null) { + File index = new File(key.toString()); + reader = IndexReader.open(new NdfsDirectory(fs, index, false)); + readers.put(key, reader); + } + reader.delete(((IntWritable)value).get()); + } + + /** Close indexes, flushing deletions. */ + public void close(Reporter reporter) throws IOException { + Iterator i = readers.values().iterator(); + while (i.hasNext()) { + ((IndexReader)i.next()).close(); + } + } + }; + } + + public void dedup(File[] indexDirs) + throws IOException { + + LOG.info("Dedup: starting"); + + File hashDir = + new File("dedup-hash-"+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new JobConf(getConf()); + + for (int i = 0; i < indexDirs.length; i++) { + LOG.info("Dedup: adding indexes in: " + indexDirs[i]); + job.addInputDir(indexDirs[i]); + } + + job.setInputKeyClass(HashScore.class); + job.setInputValueClass(IndexDoc.class); + job.setInputFormat(InputFormat.class); + + job.setPartitionerClass(HashPartitioner.class); + job.setReducerClass(HashReducer.class); + + job.setOutputDir(hashDir); + + job.setOutputKeyClass(HashScore.class); + job.setOutputValueClass(IndexDoc.class); + job.setOutputFormat(SequenceFileOutputFormat.class); + + JobClient.runJob(job); + + job = new JobConf(getConf()); + + job.addInputDir(hashDir); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(HashScore.class); + job.setInputValueClass(IndexDoc.class); + + job.setMapperClass(DeleteDuplicates.class); + + job.setOutputFormat(DeleteDuplicates.class); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(IntWritable.class); + + JobClient.runJob(job); + LOG.info("Dedup: done"); + } + + public static void main(String[] args) throws Exception { + DeleteDuplicates dedup = new DeleteDuplicates(NutchConf.get()); + + if (args.length < 1) { + System.err.println("Usage: <indexes> ..."); + return; + } + + File[] indexes = new File[args.length]; + for (int i = 0; i < args.length; i++) { + indexes[i] = new File(args[i]); + } + + dedup.dedup(indexes); + } + +} Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java?rev=307203&r1=307202&r2=307203&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/indexer/NdfsDirectory.java Fri Oct 7 15:16:27 2005 @@ -89,7 +89,13 @@ } public void renameFile(String from, String to) throws IOException { - fs.rename(new File(directory, from), new File(directory, to)); + // NDFS is currently broken when target already exists, + // so we explicitly delete the target first. + File target = new File(directory, to); + if (fs.exists(target)) { + fs.delete(target); + } + fs.rename(new File(directory, from), target); } public IndexOutput createOutput(String name) throws IOException {