Author: dogacan
Date: Mon Mar  9 17:34:51 2009
New Revision: 751774

URL: http://svn.apache.org/viewvc?rev=751774&view=rev
Log:
NUTCH-684 - Dedup support for Solr

Added:
    
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java
Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/bin/nutch
    lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrConstants.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=751774&r1=751773&r2=751774&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Mon Mar  9 17:34:51 2009
@@ -376,6 +376,7 @@
 
 141. NUTCH-711 - Indexer failing after upgrade to Hadoop 0.19.1 (ab)
 
+142. NUTCH-684 - Dedup support for Solr. (dogacan)
 
 Release 0.9 - 2007-04-02
 

Modified: lucene/nutch/trunk/bin/nutch
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/bin/nutch?rev=751774&r1=751773&r2=751774&view=diff
==============================================================================
--- lucene/nutch/trunk/bin/nutch (original)
+++ lucene/nutch/trunk/bin/nutch Mon Mar  9 17:34:51 2009
@@ -51,6 +51,7 @@
   echo "  solrindex         run the solr indexer on parsed segments and linkdb"
   echo "  merge             merge several segment indexes"
   echo "  dedup             remove duplicates from a set of segment indexes"
+  echo "  solrdedup         remove duplicates from solr"
   echo "  plugin            load a plugin and run one of its classes main()"
   echo "  server            run a search server"
   echo " or"
@@ -234,6 +235,8 @@
   CLASS=org.apache.nutch.indexer.solr.SolrIndexer
 elif [ "$COMMAND" = "dedup" ] ; then
   CLASS=org.apache.nutch.indexer.DeleteDuplicates
+elif [ "$COMMAND" = "solrdedup" ] ; then
+  CLASS=org.apache.nutch.indexer.solr.SolrDeleteDuplicates
 elif [ "$COMMAND" = "merge" ] ; then
   CLASS=org.apache.nutch.indexer.IndexMerger
 elif [ "$COMMAND" = "plugin" ] ; then

Modified: 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrConstants.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrConstants.java?rev=751774&r1=751773&r2=751774&view=diff
==============================================================================
--- 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrConstants.java 
(original)
+++ 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrConstants.java 
Mon Mar  9 17:34:51 2009
@@ -22,5 +22,15 @@
   public static final String SERVER_URL = SOLR_PREFIX + "server.url";
 
   public static final String COMMIT_SIZE = SOLR_PREFIX + "commit.size";
+  
+  public static final String ID_FIELD = "id";
+  
+  public static final String URL_FIELD = "url";
+  
+  public static final String BOOST_FIELD = "boost";
+  
+  public static final String TIMESTAMP_FIELD = "tstamp";
+  
+  public static final String DIGEST_FIELD = "digest";
 
 }

Added: 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java?rev=751774&view=auto
==============================================================================
--- 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java
 (added)
+++ 
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java
 Mon Mar  9 17:34:51 2009
@@ -0,0 +1,370 @@
+package org.apache.nutch.indexer.solr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.indexer.DeleteDuplicates;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+
+/** 
+ * Utility class for deleting duplicate documents from a solr index.
+ *
+ * The algorithm goes like follows:
+ * 
+ * Preparation:
+ * <ol>
+ * <li>Query the solr server for the number of documents (say, N)</li>
+ * <li>Partition N among M map tasks. For example, if we have two map tasks
+ * the first map task will deal with solr documents from 0 - (N / 2 - 1) and
+ * the second will deal with documents from (N / 2) to (N - 1).</li>
+ * </ol>
+ * 
+ * MapReduce:
+ * <ul>
+ * <li>Map: Identity map where keys are digests and values are {...@link 
SolrRecord}
+ * instances(which contain id, boost and timestamp)</li>
+ * <li>Reduce: After map, {...@link SolrRecord}s with the same digest will be
+ * grouped together. Now, of these documents with the same digests, delete
+ * all of them except the one with the highest score (boost field). If two
+ * (or more) documents have the same score, then the document with the latest
+ * timestamp is kept. Again, every other is deleted from solr index.
+ * </li>
+ * </ul>
+ * 
+ * Note that unlike {...@link DeleteDuplicates} we assume that two documents in
+ * a solr index will never have the same URL. So this class only deals with
+ * documents with <b>different</b> URLs but the same digest. 
+ */
+public class SolrDeleteDuplicates
+implements Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, 
SolrDeleteDuplicates.SolrRecord>,
+Tool {
+
+  public static final Log LOG = LogFactory.getLog(SolrDeleteDuplicates.class);
+
+  private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + 
":[* TO *]";
+
+  private static final int NUM_MAX_DELETE_REQUEST = 1000;
+
+  public static class SolrRecord implements Writable {
+
+    private float boost;
+    private long tstamp;
+    private String id;
+
+    public SolrRecord() { }
+
+    public SolrRecord(String id, float boost, long tstamp) {
+      this.id = id;
+      this.boost = boost;
+      this.tstamp = tstamp;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public float getBoost() {
+      return boost;
+    }
+
+    public long getTstamp() {
+      return tstamp;
+    }
+
+    public void readSolrDocument(SolrDocument doc) {
+      id = (String)doc.getFieldValue(SolrConstants.ID_FIELD);
+      boost = (Float)doc.getFieldValue(SolrConstants.BOOST_FIELD);
+      tstamp = (Long)doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = Text.readString(in);
+      boost = in.readFloat();
+      tstamp = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, id);
+      out.writeFloat(boost);
+      out.writeLong(tstamp);
+    } 
+  }
+
+  public static class SolrInputSplit implements InputSplit {
+
+    private int docBegin;
+    private int numDocs;
+
+    public SolrInputSplit() { }
+
+    public SolrInputSplit(int docBegin, int numDocs) {
+      this.docBegin = docBegin;
+      this.numDocs = numDocs;
+    }
+
+    public int getDocBegin() {
+      return docBegin;
+    }
+
+    public int getNumDocs() {
+      return numDocs;
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return numDocs;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return new String[] {} ;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      docBegin = in.readInt();
+      numDocs = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(docBegin);
+      out.writeInt(numDocs);
+    }
+  }
+
+  public static class SolrInputFormat implements InputFormat<Text, SolrRecord> 
{
+
+    /** Return each index as a split. */
+    public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
+      SolrServer solr = new 
CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));
+
+      final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);
+      solrQuery.setFields(SolrConstants.ID_FIELD);
+      solrQuery.setRows(1);
+
+      QueryResponse response;
+      try {
+        response = solr.query(solrQuery);
+      } catch (final SolrServerException e) {
+        throw new IOException(e);
+      }
+
+      int numResults = (int)response.getResults().getNumFound();
+      int numDocsPerSplit = (numResults / numSplits); 
+      int currentDoc = 0;
+      SolrInputSplit[] splits = new SolrInputSplit[numSplits];
+      for (int i = 0; i < numSplits - 1; i++) {
+        splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit);
+        currentDoc += numDocsPerSplit;
+      }
+      splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - 
currentDoc);
+
+      return splits;
+    }
+
+    public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit 
split,
+        final JobConf job, 
+        Reporter reporter)
+        throws IOException {
+
+      SolrServer solr = new 
CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));
+      SolrInputSplit solrSplit = (SolrInputSplit) split;
+      final int numDocs = solrSplit.getNumDocs();
+      
+      SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);
+      solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD,
+                          SolrConstants.TIMESTAMP_FIELD,
+                          SolrConstants.DIGEST_FIELD);
+      solrQuery.setStart(solrSplit.getDocBegin());
+      solrQuery.setRows(numDocs);
+
+      QueryResponse response;
+      try {
+        response = solr.query(solrQuery);
+      } catch (final SolrServerException e) {
+        throw new IOException(e);
+      }
+
+      final SolrDocumentList solrDocs = response.getResults();
+
+      return new RecordReader<Text, SolrRecord>() {
+
+        private int currentDoc = 0;
+
+        @Override
+        public void close() throws IOException { }
+
+        @Override
+        public Text createKey() {
+          return new Text();
+        }
+
+        @Override
+        public SolrRecord createValue() {
+          return new SolrRecord();
+        }
+
+        @Override
+        public long getPos() throws IOException {
+          return currentDoc;
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+          return currentDoc / (float) numDocs;
+        }
+
+        @Override
+        public boolean next(Text key, SolrRecord value) throws IOException {
+          if (currentDoc >= numDocs) {
+            return false;
+          }
+
+          SolrDocument doc = solrDocs.get(currentDoc);
+          String digest = (String) 
doc.getFieldValue(SolrConstants.DIGEST_FIELD);
+          key.set(digest);
+          value.readSolrDocument(doc);
+
+          currentDoc++;
+          return true;
+        }    
+      };
+    }
+  }
+
+  private Configuration conf;
+
+  private SolrServer solr;
+
+  private int numDeletes = 0;
+
+  private UpdateRequest updateRequest = new UpdateRequest();
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    try {
+      solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (numDeletes > 0) {
+        updateRequest.process(solr);
+      }
+      solr.optimize();
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void reduce(Text key, Iterator<SolrRecord> values,
+      OutputCollector<Text, SolrRecord> output, Reporter reporter)
+  throws IOException {
+    SolrRecord recordToKeep = values.next();
+    while (values.hasNext()) {
+      SolrRecord solrRecord = values.next();
+      if (solrRecord.getBoost() > recordToKeep.getBoost() ||
+          (solrRecord.getBoost() == recordToKeep.getBoost() && 
+              solrRecord.getTstamp() > recordToKeep.getTstamp())) {
+        updateRequest.deleteById(recordToKeep.id);
+        recordToKeep = solrRecord;
+      } else {
+        updateRequest.deleteById(solrRecord.id);
+      }
+      numDeletes++;
+      if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+        try {
+          updateRequest.process(solr);
+        } catch (SolrServerException e) {
+          throw new IOException(e);
+        }
+        updateRequest = new UpdateRequest();
+        numDeletes = 0;
+      }
+    }
+  }
+
+  public void dedup(String solrUrl) throws IOException {
+    LOG.info("SolrDeleteDuplicates: starting...");
+    LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
+    
+    JobConf job = new NutchJob(getConf());
+
+    job.set(SolrConstants.SERVER_URL, solrUrl);
+    job.setInputFormat(SolrInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(SolrRecord.class);
+    job.setMapperClass(IdentityMapper.class);
+    job.setReducerClass(SolrDeleteDuplicates.class);
+
+    JobClient.runJob(job);
+    
+    LOG.info("SolrDeleteDuplicates: done.");
+  }
+
+  public int run(String[] args) throws IOException {
+    if (args.length != 1) {
+      System.err.println("Usage: SolrDeleteDuplicates <solr url>");
+      return 1;
+    }
+
+    dedup(args[0]);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(),
+        new SolrDeleteDuplicates(), args);
+    System.exit(result);
+  }
+
+}


Reply via email to