Author: cutting
Date: Tue May 24 13:43:58 2005
New Revision: 178278

URL: http://svn.apache.org/viewcvs?rev=178278&view=rev
Log:
Add MapReduce-based fetch list generator.

Added:
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
      - copied, changed from r178047, 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
Removed:
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
Modified:
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
 Tue May 24 13:43:58 2005
@@ -31,12 +31,16 @@
   public void reduce(WritableComparable key, Iterator values,
                      OutputCollector output) throws IOException {
     // collect datum with the highest status
-    CrawlDatum datum = null;
+    CrawlDatum result = null;
+    int linkCount = 0;
     while (values.hasNext()) {
-      CrawlDatum nextDatum = (CrawlDatum)values.next();
-      if (datum == null || nextDatum.getStatus() > datum.getStatus())
-        datum = nextDatum;
+      CrawlDatum datum = (CrawlDatum)values.next();
+      linkCount += datum.getLinkCount();          // sum link counts
+                                                  // keep w/ max status
+      if (result == null || datum.getStatus() > result.getStatus())
+        result = datum;
     }
-    output.collect(key, datum);
+    result.setLinkCount(linkCount);
+    output.collect(key, result);
   }
 }

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java 
Tue May 24 13:43:58 2005
@@ -24,7 +24,7 @@
 import org.apache.nutch.util.*;
 
 /* The crawl state of a url. */
-public class CrawlDatum implements Writable, Cloneable {
+public class CrawlDatum implements WritableComparable, Cloneable {
   private final static byte CUR_VERSION = 0;
 
   public static final byte STATUS_DB_UNFETCHED = 0;
@@ -38,12 +38,15 @@
   private long nextFetch = System.currentTimeMillis();
   private byte retries;
   private float fetchInterval;
+  private int linkCount;
 
   public CrawlDatum() {}
 
   public CrawlDatum(int status, float fetchInterval) {
     this.status = (byte)status;
     this.fetchInterval = fetchInterval;
+    if (status == STATUS_LINKED)
+      linkCount = 1;
   }
 
   //
@@ -64,6 +67,9 @@
     this.fetchInterval = fetchInterval;
   }
 
+  public int getLinkCount() { return linkCount; }
+  public void setLinkCount(int linkCount) { this.linkCount = linkCount; }
+
   //
   // writable methods
   //
@@ -77,14 +83,19 @@
     nextFetch = in.readLong();
     retries = in.readByte();
     fetchInterval = in.readFloat();
+    linkCount = in.readInt();
   }
 
+  /** The number of bytes into a CrawlDatum that the linkCount is stored. */
+  private static final int LINK_COUNT_OFFSET = 1 + 1 + 8 + 1 + 4;
+
   public void write(DataOutput out) throws IOException {
     out.writeByte(CUR_VERSION);                   // store current version
-    out.write(status);
+    out.writeByte(status);
     out.writeLong(nextFetch);
-    out.write(retries);
+    out.writeByte(retries);
     out.writeFloat(fetchInterval);
+    out.writeInt(linkCount);
   }
 
   /** Copy the contents of another instance into this instance. */
@@ -93,8 +104,37 @@
     this.nextFetch = that.nextFetch;
     this.retries = that.retries;
     this.fetchInterval = that.fetchInterval;
+    this.linkCount = that.linkCount;
+  }
+
+
+  //
+  // compare methods
+  //
+  
+  /** Sort by decreasing link count. */
+  public int compareTo(Object o) {
+    int thisLinkCount = this.linkCount;
+    int thatLinkCount = ((CrawlDatum)o).linkCount;
+    return thatLinkCount - thisLinkCount;
   }
 
+  /** A Comparator optimized for CrawlDatum. */ 
+  public static class Comparator extends WritableComparator {
+    public Comparator() { super(CrawlDatum.class); }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int linkCount1 = readInt(b1,s1+LINK_COUNT_OFFSET);
+      int linkCount2 = readInt(b2,s2+LINK_COUNT_OFFSET);
+      return linkCount2 - linkCount1;
+    }
+  }
+
+  static {                                        // register this comparator
+    WritableComparator.define(CrawlDatum.class, new Comparator());
+  }
+
+
   //
   // basic methods
   //
@@ -106,6 +146,7 @@
     buf.append("Next fetch: " + new Date(getNextFetchTime()) + "\n");
     buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n");
     buf.append("Retry interval: " + getFetchInterval() + " days\n");
+    buf.append("Link Count: " + getLinkCount() + "\n");
     return buf.toString();
   }
 
@@ -117,7 +158,8 @@
       (this.status == other.status) &&
       (this.nextFetch == other.nextFetch) &&
       (this.retries == other.retries) &&
-      (this.fetchInterval == other.fetchInterval);
+      (this.fetchInterval == other.fetchInterval) &&
+      (this.linkCount == other.linkCount);
   }
 
   public int hashCode() {
@@ -125,7 +167,8 @@
       status ^
       ((int)nextFetch) ^
       retries ^
-      Float.floatToIntBits(fetchInterval);
+      Float.floatToIntBits(fetchInterval) ^
+      linkCount;
   }
 
   public Object clone() {

Added: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=178278&view=auto
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java 
(added)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java 
Tue May 24 13:43:58 2005
@@ -0,0 +1,183 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.text.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.mapred.lib.*;
+
+/** Generates a subset of a crawl db to fetch. */
+public class Generator extends NutchConfigured {
+
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.crawl.Generator");
+
+  private File dbDir;
+  private boolean refetchOnly;
+
+  /** Selects entries due for fetch. */
+  public static class Selector implements Mapper, Partitioner, Reducer {
+    private long curTime;
+    private boolean refetchOnly;
+    private long limit;
+    private long count;
+
+    public void configure(JobConf job) {
+      curTime = job.getLong("crawl.gen.curTime", System.currentTimeMillis());
+      refetchOnly = job.getBoolean("crawl.gen.refetchOnly", false);
+      limit = job.getLong("crawl.gen.limit", Long.MAX_VALUE);
+    }
+
+    /** Select & invert subset due for fetch. */
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector output) throws IOException {
+      CrawlDatum crawlDatum = (CrawlDatum)value;
+
+      if (crawlDatum.getNextFetchTime() > curTime)
+        return;
+
+      if (refetchOnly
+          && crawlDatum.getStatus() == CrawlDatum.STATUS_DB_UNFETCHED)
+        return;
+
+      output.collect(crawlDatum, key);          // invert for sort by linkCount
+    }
+
+    /** Hash urls to randomize link counts accross partitions. */
+    public int getPartition(WritableComparable key, Writable value,
+                            int numReduceTasks) {
+      return (value.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+    }
+
+    /** Collect until limit is reached. */
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector output) throws IOException {
+      while (values.hasNext() && ++count < limit) {
+        output.collect(key, (Writable)values.next());
+      }
+
+    }
+
+  }
+
+  /** Construct a generator. */
+  public Generator(NutchConf conf, File dbDir) {
+    super(conf);
+    this.dbDir = dbDir;
+  }
+
+  /** Generate fetchlists. */
+  public void generate(File dir, int numLists, long topN, long curTime,
+                       boolean refetchOnly)
+    throws IOException {
+
+    File tempDir =
+      new File("generate-temp-"+
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // map to inverted subset due for fetch
+    JobConf job = new JobConf(getConf());
+    
+    job.setLong("crawl.gen.curTime", curTime);
+    job.setBoolean("crawl.gen.refetchOnly", refetchOnly);
+    job.setLong("crawl.gen.limit", topN / job.getNumReduceTasks());
+
+    job.setInputDir(new File(dbDir, "current"));
+    job.setInputFormat(InputFormats.get("seq"));
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(CrawlDatum.class);
+
+    job.setMapperClass(Selector.class);
+    job.setPartitionerClass(Selector.class);
+    job.setReducerClass(Selector.class);
+
+    job.setOutputDir(tempDir);
+    job.setOutputFormat(OutputFormats.get("seq"));
+    job.setOutputKeyClass(CrawlDatum.class);
+    job.setOutputValueClass(UTF8.class);
+    JobClient.runJob(job);
+
+    // invert again and paritition by host
+    job = new JobConf(getConf());
+    
+    job.setInt("partition.url.by.host.seed", new Random().nextInt());
+
+    job.setInputDir(tempDir);
+    job.setInputFormat(InputFormats.get("seq"));
+    job.setInputKeyClass(CrawlDatum.class);
+    job.setInputValueClass(UTF8.class);
+
+    job.setMapperClass(InverseMapper.class);
+    job.setPartitionerClass(PartitionUrlByHost.class);
+    job.setNumReduceTasks(numLists);
+
+    job.setOutputDir(dir);
+    job.setOutputFormat(OutputFormats.get("seq"));
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(CrawlDatum.class);
+    JobClient.runJob(job);
+
+    new JobClient(getConf()).getFs().delete(tempDir);
+  }
+
+  /**
+   * Generate a fetchlist from the pagedb and linkdb
+   */
+  public static void main(String args[]) throws Exception {
+    if (args.length < 2) {
+      System.out.println("Usage: Generator <crawldb> <segments_dir> 
[-refetchonly] [-topN N] [-numFetchers numFetchers] [-adddays numDays]");
+      return;
+    }
+
+    File dbDir = new File(args[0]);
+    File segmentsDir = new File(args[1]);
+    long curTime = System.currentTimeMillis();
+    boolean refetchOnly = false;
+    long topN = Long.MAX_VALUE;
+    int numFetchers = 1;
+
+    for (int i = 2; i < args.length; i++) {
+      if ("-refetchonly".equals(args[i])) {
+        refetchOnly = true;
+      } else if ("-topN".equals(args[i])) {
+        topN = Long.parseLong(args[i+1]);
+        i++;
+      } else if ("-numFetchers".equals(args[i])) {
+        numFetchers = Integer.parseInt(args[i+1]);
+        i++;
+      } else if ("-adddays".equals(args[i])) {
+        long numDays = Integer.parseInt(args[i+1]);
+        curTime += numDays * 1000L * 60 * 60 * 24;
+      }
+    }
+
+    LOG.info("Generator started");
+    if (topN != Long.MAX_VALUE)
+      LOG.info("topN:" + topN);
+    Generator gen = new Generator(NutchConf.get(), dbDir);
+    gen.generate(segmentsDir, numFetchers, topN, curTime, refetchOnly);
+    LOG.info("Generator completed");
+  }
+}

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java 
Tue May 24 13:43:58 2005
@@ -88,12 +88,13 @@
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
     JobConf mergeJob = new JobConf(getConf());
     mergeJob.addInputDir(tempDir);
-    mergeJob.addInputDir(crawlDb);
+    mergeJob.addInputDir(new File(crawlDb, "current/"));
     mergeJob.setInputFormat(InputFormats.get("seq"));
     mergeJob.setInputKeyClass(UTF8.class);
     mergeJob.setInputValueClass(CrawlDatum.class);
 
-    mergeJob.setPartitionerClass(CrawlDBPartitioner.class);
+    mergeJob.setInt("partition.url.by.host.seed", new Random().nextInt());
+    mergeJob.setPartitionerClass(PartitionUrlByHost.class);
     mergeJob.setReducerClass(CrawlDBReducer.class);
 
     mergeJob.setOutputDir(newCrawlDb);
@@ -103,7 +104,12 @@
 
     JobClient.runJob(mergeJob);
 
-    new JobClient(getConf()).getFs().delete(tempDir);
+    NutchFileSystem fs = new JobClient(getConf()).getFs();
+    fs.delete(new File(crawlDb, "old/"));
+    fs.rename(new File(crawlDb, "current/"), new File(crawlDb, "old/"));
+    fs.rename(newCrawlDb, new File(crawlDb, "current/"));
+    fs.delete(new File(crawlDb, "old/"));
+    fs.delete(tempDir);
 
   }
 

Copied: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
 (from r178047, 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java)
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java?p2=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java&p1=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java&r1=178047&r2=178278&rev=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
 Tue May 24 13:43:58 2005
@@ -23,12 +23,16 @@
 import org.apache.nutch.mapred.*;
 
 /** Partition urls by hostname. */
-public class CrawlDBPartitioner implements Partitioner {
-  
-  public void configure(JobConf job) {}
+public class PartitionUrlByHost implements Partitioner {
+  private int seed;
+
+  public void configure(JobConf job) {
+    seed = job.getInt("partition.url.by.host.seed", 0);
+  }
   
   /** Hash by hostname. */
-  public int getPartition(WritableComparable key, int numReduceTasks) {
+  public int getPartition(WritableComparable key, Writable value,
+                          int numReduceTasks) {
     String urlString = ((UTF8)key).toString();
     URL url = null;
     try {
@@ -36,6 +40,9 @@
     } catch (MalformedURLException e) {
     }
     int hashCode = (url==null ? urlString : url.getHost()).hashCode();
+
+    // make hosts wind up in different partitions on different runs
+    hashCode ^= seed;
 
     return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
   }

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java 
Tue May 24 13:43:58 2005
@@ -75,7 +75,8 @@
           public synchronized void collect(WritableComparable key,
                                            Writable value)
             throws IOException {
-            outs[partitioner.getPartition(key, partitions)].append(key, value);
+            outs[partitioner.getPartition(key, value, partitions)]
+              .append(key, value);
           }
         };
 

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
 Tue May 24 13:43:58 2005
@@ -21,12 +21,13 @@
 
 /** Partitions the key space.  A partition is created for each reduce task. */
 public interface Partitioner extends Configurable {
-  /** Returns the paritition number for a given key given the total number of
+  /** Returns the paritition number for a given entry given the total number of
    * partitions.  Typically a hash function on a all or a subset of the key.
    *
-   * @param key the key
+   * @param key the entry key
+   * @param value the entry value
    * @param numPartitions the number of partitions
    * @return the partition number
    */
-  int getPartition(WritableComparable key, int numPartitions);
+  int getPartition(WritableComparable key, Writable value, int numPartitions);
 }

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
 Tue May 24 13:43:58 2005
@@ -20,6 +20,7 @@
 import org.apache.nutch.mapred.JobConf;
 
 import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.Writable;
 
 /** Partition keys by their [EMAIL PROTECTED] Object#hashCode()}. */
 public class HashPartitioner implements Partitioner {
@@ -27,7 +28,8 @@
   public void configure(JobConf job) {}
 
   /** Use [EMAIL PROTECTED] Object#hashCode()} to partition. */
-  public int getPartition(WritableComparable key, int numReduceTasks) {
+  public int getPartition(WritableComparable key, Writable value,
+                          int numReduceTasks) {
     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
   }
 

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java 
Tue May 24 13:43:58 2005
@@ -138,6 +138,11 @@
     }
   }
 
+  /** Sets the value of the <code>name</code> property to a long. */
+  public void setLong(String name, long value) {
+    set(name, Long.toString(value));
+  }
+
   /** Returns the value of the <code>name</code> property as a float.  If no
    * such property is specified, or if the specified value is not a valid
    * float, then <code>defaultValue</code> is returned.
@@ -165,6 +170,11 @@
     else if ("false".equals(valueString))
       return false;
     else return defaultValue;
+  }
+
+  /** Sets the value of the <code>name</code> property to an integer. */
+  public void setBoolean(String name, boolean value) {
+    set(name, Boolean.toString(value));
   }
 
   /** Returns the value of the <code>name</code> property as an array of




-------------------------------------------------------
This SF.Net email is sponsored by Yahoo.
Introducing Yahoo! Search Developer Network - Create apps using Yahoo!
Search APIs Find out how you can build Yahoo! directly into your own
Applications - visit http://developer.yahoo.net/?fr=offad-ysdn-ostg-q22005
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to