Author: cutting
Date: Fri Oct  7 15:16:27 2005
New Revision: 307203

First working version of MapReduce-based dedup.


Modified: lucene/nutch/branches/mapred/bin/nutch
--- lucene/nutch/branches/mapred/bin/nutch (original)
+++ lucene/nutch/branches/mapred/bin/nutch Fri Oct  7 15:16:27 2005
@@ -138,6 +138,8 @@
 elif [ "$COMMAND" = "index" ] ; then
+elif [ "$COMMAND" = "dedup" ] ; then
+  CLASS=org.apache.nutch.crawl.DeleteDuplicates
 elif [ "$COMMAND" = "merge" ] ; then
 elif [ "$COMMAND" = "server" ] ; then

--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ 
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ Fri 
Oct  7 15:16:27 2005
@@ -109,8 +109,9 @@
     new LinkDb(conf).invert(linkDb, segments); // invert links
-    // index
+    // index & dedup
     new Indexer(conf).index(index, linkDb, fs.listFiles(segments));
+    new DeleteDuplicates(conf).dedup(new File[] { index });"crawl finished: " + dir);

 Fri Oct  7 15:16:27 2005
@@ -0,0 +1,338 @@
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+import java.text.*;
+import java.util.*;
+import java.util.logging.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.indexer.*;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.document.Document;
+ * Deletes duplicate documents in a set of Lucene indexes.
+ * Duplicates have either the same contents (via MD5 hash) or the same URL.
+ ******************************************************************/
+public class DeleteDuplicates extends NutchConfigured
+  implements Mapper, OutputFormat {
+  private static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.crawl.DeleteDuplicates");
+//   Algorithm:
+//   1. map indexes -> <<md5, score, urlLen>, <index,doc>>
+//      partition by md5
+//      reduce, deleting all but largest score w/ shortest url
+//   2. map indexes -> <<url, fetchdate>, <index,doc>>
+//      partition by url
+//      reduce, deleting all but most recent.
+//   Part 2 is not yet implemented, but the Indexer currently only indexes one
+//   URL per page, so this is not a critical problem.
+  public static class IndexDoc implements WritableComparable {
+    private UTF8 index;                           // the segment index
+    private int doc;                              // within the index
+    public void write(DataOutput out) throws IOException {
+      index.write(out);
+      out.writeInt(doc);
+    }
+    public void readFields(DataInput in) throws IOException {
+      if (index == null) {
+        index = new UTF8();
+      }
+      index.readFields(in);
+      this.doc = in.readInt();
+    }
+    public int compareTo(Object o) {
+      IndexDoc that = (IndexDoc)o;
+      int indexCompare = this.index.compareTo(that.index);
+      if (indexCompare != 0) {                    // prefer later indexes
+        return indexCompare;
+      } else {
+        return this.doc - that.doc;               // prefer later docs
+      }
+    }
+    public boolean equals(Object o) {
+      IndexDoc that = (IndexDoc)o;
+      return this.index.equals(that.index) && this.doc == that.doc;
+    }
+  }
+  public static class HashScore implements WritableComparable {
+    private MD5Hash hash;
+    private float score;
+    private int urlLen;
+    public void write(DataOutput out) throws IOException {
+      hash.write(out);
+      out.writeFloat(score);
+      out.writeInt(urlLen);
+    }
+    public void readFields(DataInput in) throws IOException {
+      if (hash == null) {
+        hash = new MD5Hash();
+      }
+      hash.readFields(in);
+      score = in.readFloat();
+      urlLen = in.readInt();
+    }
+    public int compareTo(Object o) {
+      HashScore that = (HashScore)o;
+      if (!this.hash.equals(that.hash)) {         // order first by hash
+        return this.hash.compareTo(that.hash);
+      } else if (this.score != that.score) {      // prefer larger scores
+        return this.score < that.score ? 1 : -1 ;
+      } else {                                    // prefer shorter urls
+        return this.urlLen - that.urlLen;
+      }
+    }
+    public boolean equals(Object o) {
+      HashScore that = (HashScore)o;
+      return this.hash.equals(that.hash)
+        && this.score == that.score
+        && this.urlLen == that.urlLen;
+    }
+  }
+  public static class InputFormat extends InputFormatBase {
+    private static final int INDEX_LENGTH = Integer.MAX_VALUE;
+    /** Return each index as a split. */
+    public FileSplit[] getSplits(NutchFileSystem fs, JobConf job,
+                                 int numSplits)
+      throws IOException {
+      File[] files = listFiles(fs, job);
+      FileSplit[] splits = new FileSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        splits[i] = new FileSplit(files[i], 0, INDEX_LENGTH);
+      }
+      return splits;
+    }
+    /** Return each index as a split. */
+    public RecordReader getRecordReader(final NutchFileSystem fs,
+                                        final FileSplit split,
+                                        final JobConf job,
+                                        Reporter reporter) throws IOException {
+      final UTF8 index = new UTF8(split.getFile().toString());
+      reporter.setStatus(index.toString());
+      return new RecordReader() {
+          private IndexReader indexReader =
+   NdfsDirectory(fs, split.getFile(), false));
+          { indexReader.undeleteAll(); }
+          private final int maxDoc = indexReader.maxDoc();
+          private int doc;
+          public boolean next(Writable key, Writable value)
+            throws IOException {
+            if (doc >= maxDoc)
+              return false;
+            Document document = indexReader.document(doc);
+            // fill in key
+            if (key instanceof UTF8) {
+              ((UTF8)key).set(document.get("url"));
+            } else {
+              HashScore hashScore = (HashScore)key;
+              if (hashScore.hash == null) {
+                hashScore.hash = new MD5Hash();
+              }
+              hashScore.hash.setDigest(document.get("digest"));
+              hashScore.score = Float.parseFloat(document.get("boost"));
+              hashScore.urlLen = document.get("url").length();
+            }
+            // fill in value
+            IndexDoc indexDoc = (IndexDoc)value;
+            if (indexDoc.index == null) {
+              indexDoc.index = new UTF8();
+            }
+            indexDoc.index.set(index);
+            indexDoc.doc = doc;
+            doc++;
+            return true;
+          }
+          public long getPos() throws IOException {
+            return (doc*INDEX_LENGTH)/maxDoc;
+          }
+          public void close() throws IOException {
+            indexReader.close();
+          }
+        };
+    }
+  }
+  public static class HashPartitioner implements Partitioner {
+    public void configure(JobConf job) {}
+    public int getPartition(WritableComparable key, Writable value,
+                            int numReduceTasks) {
+      int hashCode = ((HashScore)key).hash.hashCode();
+      return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+    }
+  }
+  public static class HashReducer implements Reducer {
+    private MD5Hash prevHash = new MD5Hash();
+    public void configure(JobConf job) {}
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector output, Reporter reporter)
+      throws IOException {
+      MD5Hash hash = ((HashScore)key).hash;
+      while (values.hasNext()) {
+        Writable value = (Writable);
+        if (hash.equals(prevHash)) {                // collect all but first
+          output.collect(key, value);
+        } else {
+          prevHash.set(hash);
+        }
+      }
+    }
+  }
+  public DeleteDuplicates() { super(null); }
+  public DeleteDuplicates(NutchConf conf) { super(conf); }
+  public void configure(JobConf job) {}
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
+    IndexDoc indexDoc = (IndexDoc)value;
+    output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
+  }
+  private HashMap readers = new HashMap();
+  public RecordWriter getRecordWriter(final NutchFileSystem fs,
+                                      final JobConf job,
+                                      final String name) throws IOException {
+    return new RecordWriter() {                   
+        /** Delete value from index named in key. */
+        public void write(WritableComparable key, Writable value)
+          throws IOException {
+          IndexReader reader = (IndexReader)readers.get(key);
+          if (reader == null) {
+            File index = new File(key.toString());
+            reader = NdfsDirectory(fs, index, false));
+            readers.put(key, reader);
+          }
+          reader.delete(((IntWritable)value).get());
+        }
+        /** Close indexes, flushing deletions. */
+        public void close(Reporter reporter) throws IOException {
+          Iterator i = readers.values().iterator();
+          while (i.hasNext()) {
+            ((IndexReader);
+          }
+        }
+      };
+  }
+  public void dedup(File[] indexDirs)
+    throws IOException {
+"Dedup: starting");
+    File hashDir =
+      new File("dedup-hash-"+
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    JobConf job = new JobConf(getConf());
+    for (int i = 0; i < indexDirs.length; i++) {
+"Dedup: adding indexes in: " + indexDirs[i]);
+      job.addInputDir(indexDirs[i]);
+    }
+    job.setInputKeyClass(HashScore.class);
+    job.setInputValueClass(IndexDoc.class);
+    job.setInputFormat(InputFormat.class);
+    job.setPartitionerClass(HashPartitioner.class);
+    job.setReducerClass(HashReducer.class);
+    job.setOutputDir(hashDir);
+    job.setOutputKeyClass(HashScore.class);
+    job.setOutputValueClass(IndexDoc.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    JobClient.runJob(job);
+    job = new JobConf(getConf());
+    job.addInputDir(hashDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(HashScore.class);
+    job.setInputValueClass(IndexDoc.class);
+    job.setMapperClass(DeleteDuplicates.class);
+    job.setOutputFormat(DeleteDuplicates.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(IntWritable.class);
+    JobClient.runJob(job);
+"Dedup: done");
+  }
+  public static void main(String[] args) throws Exception {
+    DeleteDuplicates dedup = new DeleteDuplicates(NutchConf.get());
+    if (args.length < 1) {
+      System.err.println("Usage: <indexes> ...");
+      return;
+    }
+    File[] indexes = new File[args.length];
+    for (int i = 0; i < args.length; i++) {
+      indexes[i] = new File(args[i]);
+    }
+    dedup.dedup(indexes);
+  }

 Fri Oct  7 15:16:27 2005
@@ -89,7 +89,13 @@
   public void renameFile(String from, String to) throws IOException {
-    fs.rename(new File(directory, from), new File(directory, to));
+    // NDFS is currently broken when target already exists,
+    // so we explicitly delete the target first.
+    File target = new File(directory, to);
+    if (fs.exists(target)) {
+      fs.delete(target);
+    }
+    fs.rename(new File(directory, from), target);
   public IndexOutput createOutput(String name) throws IOException {

Reply via email to