Author: cutting
Date: Tue May 24 13:43:58 2005
New Revision: 178278
URL: http://svn.apache.org/viewcvs?rev=178278&view=rev
Log:
Add MapReduce-based fetch list generator.
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
- copied, changed from r178047,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
Removed:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
Tue May 24 13:43:58 2005
@@ -31,12 +31,16 @@
public void reduce(WritableComparable key, Iterator values,
OutputCollector output) throws IOException {
// collect datum with the highest status
- CrawlDatum datum = null;
+ CrawlDatum result = null;
+ int linkCount = 0;
while (values.hasNext()) {
- CrawlDatum nextDatum = (CrawlDatum)values.next();
- if (datum == null || nextDatum.getStatus() > datum.getStatus())
- datum = nextDatum;
+ CrawlDatum datum = (CrawlDatum)values.next();
+ linkCount += datum.getLinkCount(); // sum link counts
+ // keep w/ max status
+ if (result == null || datum.getStatus() > result.getStatus())
+ result = datum;
}
- output.collect(key, datum);
+ result.setLinkCount(linkCount);
+ output.collect(key, result);
}
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
Tue May 24 13:43:58 2005
@@ -24,7 +24,7 @@
import org.apache.nutch.util.*;
/* The crawl state of a url. */
-public class CrawlDatum implements Writable, Cloneable {
+public class CrawlDatum implements WritableComparable, Cloneable {
private final static byte CUR_VERSION = 0;
public static final byte STATUS_DB_UNFETCHED = 0;
@@ -38,12 +38,15 @@
private long nextFetch = System.currentTimeMillis();
private byte retries;
private float fetchInterval;
+ private int linkCount;
public CrawlDatum() {}
public CrawlDatum(int status, float fetchInterval) {
this.status = (byte)status;
this.fetchInterval = fetchInterval;
+ if (status == STATUS_LINKED)
+ linkCount = 1;
}
//
@@ -64,6 +67,9 @@
this.fetchInterval = fetchInterval;
}
+ public int getLinkCount() { return linkCount; }
+ public void setLinkCount(int linkCount) { this.linkCount = linkCount; }
+
//
// writable methods
//
@@ -77,14 +83,19 @@
nextFetch = in.readLong();
retries = in.readByte();
fetchInterval = in.readFloat();
+ linkCount = in.readInt();
}
+ /** The number of bytes into a CrawlDatum that the linkCount is stored. */
+ private static final int LINK_COUNT_OFFSET = 1 + 1 + 8 + 1 + 4;
+
public void write(DataOutput out) throws IOException {
out.writeByte(CUR_VERSION); // store current version
- out.write(status);
+ out.writeByte(status);
out.writeLong(nextFetch);
- out.write(retries);
+ out.writeByte(retries);
out.writeFloat(fetchInterval);
+ out.writeInt(linkCount);
}
/** Copy the contents of another instance into this instance. */
@@ -93,8 +104,37 @@
this.nextFetch = that.nextFetch;
this.retries = that.retries;
this.fetchInterval = that.fetchInterval;
+ this.linkCount = that.linkCount;
+ }
+
+
+ //
+ // compare methods
+ //
+
+ /** Sort by decreasing link count. */
+ public int compareTo(Object o) {
+ int thisLinkCount = this.linkCount;
+ int thatLinkCount = ((CrawlDatum)o).linkCount;
+ return thatLinkCount - thisLinkCount;
}
+ /** A Comparator optimized for CrawlDatum. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() { super(CrawlDatum.class); }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int linkCount1 = readInt(b1,s1+LINK_COUNT_OFFSET);
+ int linkCount2 = readInt(b2,s2+LINK_COUNT_OFFSET);
+ return linkCount2 - linkCount1;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(CrawlDatum.class, new Comparator());
+ }
+
+
//
// basic methods
//
@@ -106,6 +146,7 @@
buf.append("Next fetch: " + new Date(getNextFetchTime()) + "\n");
buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n");
buf.append("Retry interval: " + getFetchInterval() + " days\n");
+ buf.append("Link Count: " + getLinkCount() + "\n");
return buf.toString();
}
@@ -117,7 +158,8 @@
(this.status == other.status) &&
(this.nextFetch == other.nextFetch) &&
(this.retries == other.retries) &&
- (this.fetchInterval == other.fetchInterval);
+ (this.fetchInterval == other.fetchInterval) &&
+ (this.linkCount == other.linkCount);
}
public int hashCode() {
@@ -125,7 +167,8 @@
status ^
((int)nextFetch) ^
retries ^
- Float.floatToIntBits(fetchInterval);
+ Float.floatToIntBits(fetchInterval) ^
+ linkCount;
}
public Object clone() {
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=178278&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
Tue May 24 13:43:58 2005
@@ -0,0 +1,183 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.text.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.mapred.lib.*;
+
+/** Generates a subset of a crawl db to fetch. */
+public class Generator extends NutchConfigured {
+
+ public static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.crawl.Generator");
+
+ private File dbDir;
+ private boolean refetchOnly;
+
+ /** Selects entries due for fetch. */
+ public static class Selector implements Mapper, Partitioner, Reducer {
+ private long curTime;
+ private boolean refetchOnly;
+ private long limit;
+ private long count;
+
+ public void configure(JobConf job) {
+ curTime = job.getLong("crawl.gen.curTime", System.currentTimeMillis());
+ refetchOnly = job.getBoolean("crawl.gen.refetchOnly", false);
+ limit = job.getLong("crawl.gen.limit", Long.MAX_VALUE);
+ }
+
+ /** Select & invert subset due for fetch. */
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output) throws IOException {
+ CrawlDatum crawlDatum = (CrawlDatum)value;
+
+ if (crawlDatum.getNextFetchTime() > curTime)
+ return;
+
+ if (refetchOnly
+ && crawlDatum.getStatus() == CrawlDatum.STATUS_DB_UNFETCHED)
+ return;
+
+ output.collect(crawlDatum, key); // invert for sort by linkCount
+ }
+
+ /** Hash urls to randomize link counts accross partitions. */
+ public int getPartition(WritableComparable key, Writable value,
+ int numReduceTasks) {
+ return (value.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+ }
+
+ /** Collect until limit is reached. */
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output) throws IOException {
+ while (values.hasNext() && ++count < limit) {
+ output.collect(key, (Writable)values.next());
+ }
+
+ }
+
+ }
+
+ /** Construct a generator. */
+ public Generator(NutchConf conf, File dbDir) {
+ super(conf);
+ this.dbDir = dbDir;
+ }
+
+ /** Generate fetchlists. */
+ public void generate(File dir, int numLists, long topN, long curTime,
+ boolean refetchOnly)
+ throws IOException {
+
+ File tempDir =
+ new File("generate-temp-"+
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // map to inverted subset due for fetch
+ JobConf job = new JobConf(getConf());
+
+ job.setLong("crawl.gen.curTime", curTime);
+ job.setBoolean("crawl.gen.refetchOnly", refetchOnly);
+ job.setLong("crawl.gen.limit", topN / job.getNumReduceTasks());
+
+ job.setInputDir(new File(dbDir, "current"));
+ job.setInputFormat(InputFormats.get("seq"));
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(CrawlDatum.class);
+
+ job.setMapperClass(Selector.class);
+ job.setPartitionerClass(Selector.class);
+ job.setReducerClass(Selector.class);
+
+ job.setOutputDir(tempDir);
+ job.setOutputFormat(OutputFormats.get("seq"));
+ job.setOutputKeyClass(CrawlDatum.class);
+ job.setOutputValueClass(UTF8.class);
+ JobClient.runJob(job);
+
+ // invert again and paritition by host
+ job = new JobConf(getConf());
+
+ job.setInt("partition.url.by.host.seed", new Random().nextInt());
+
+ job.setInputDir(tempDir);
+ job.setInputFormat(InputFormats.get("seq"));
+ job.setInputKeyClass(CrawlDatum.class);
+ job.setInputValueClass(UTF8.class);
+
+ job.setMapperClass(InverseMapper.class);
+ job.setPartitionerClass(PartitionUrlByHost.class);
+ job.setNumReduceTasks(numLists);
+
+ job.setOutputDir(dir);
+ job.setOutputFormat(OutputFormats.get("seq"));
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(CrawlDatum.class);
+ JobClient.runJob(job);
+
+ new JobClient(getConf()).getFs().delete(tempDir);
+ }
+
+ /**
+ * Generate a fetchlist from the pagedb and linkdb
+ */
+ public static void main(String args[]) throws Exception {
+ if (args.length < 2) {
+ System.out.println("Usage: Generator <crawldb> <segments_dir>
[-refetchonly] [-topN N] [-numFetchers numFetchers] [-adddays numDays]");
+ return;
+ }
+
+ File dbDir = new File(args[0]);
+ File segmentsDir = new File(args[1]);
+ long curTime = System.currentTimeMillis();
+ boolean refetchOnly = false;
+ long topN = Long.MAX_VALUE;
+ int numFetchers = 1;
+
+ for (int i = 2; i < args.length; i++) {
+ if ("-refetchonly".equals(args[i])) {
+ refetchOnly = true;
+ } else if ("-topN".equals(args[i])) {
+ topN = Long.parseLong(args[i+1]);
+ i++;
+ } else if ("-numFetchers".equals(args[i])) {
+ numFetchers = Integer.parseInt(args[i+1]);
+ i++;
+ } else if ("-adddays".equals(args[i])) {
+ long numDays = Integer.parseInt(args[i+1]);
+ curTime += numDays * 1000L * 60 * 60 * 24;
+ }
+ }
+
+ LOG.info("Generator started");
+ if (topN != Long.MAX_VALUE)
+ LOG.info("topN:" + topN);
+ Generator gen = new Generator(NutchConf.get(), dbDir);
+ gen.generate(segmentsDir, numFetchers, topN, curTime, refetchOnly);
+ LOG.info("Generator completed");
+ }
+}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
Tue May 24 13:43:58 2005
@@ -88,12 +88,13 @@
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf mergeJob = new JobConf(getConf());
mergeJob.addInputDir(tempDir);
- mergeJob.addInputDir(crawlDb);
+ mergeJob.addInputDir(new File(crawlDb, "current/"));
mergeJob.setInputFormat(InputFormats.get("seq"));
mergeJob.setInputKeyClass(UTF8.class);
mergeJob.setInputValueClass(CrawlDatum.class);
- mergeJob.setPartitionerClass(CrawlDBPartitioner.class);
+ mergeJob.setInt("partition.url.by.host.seed", new Random().nextInt());
+ mergeJob.setPartitionerClass(PartitionUrlByHost.class);
mergeJob.setReducerClass(CrawlDBReducer.class);
mergeJob.setOutputDir(newCrawlDb);
@@ -103,7 +104,12 @@
JobClient.runJob(mergeJob);
- new JobClient(getConf()).getFs().delete(tempDir);
+ NutchFileSystem fs = new JobClient(getConf()).getFs();
+ fs.delete(new File(crawlDb, "old/"));
+ fs.rename(new File(crawlDb, "current/"), new File(crawlDb, "old/"));
+ fs.rename(newCrawlDb, new File(crawlDb, "current/"));
+ fs.delete(new File(crawlDb, "old/"));
+ fs.delete(tempDir);
}
Copied:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
(from r178047,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java)
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java?p2=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java&p1=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java&r1=178047&r2=178278&rev=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBPartitioner.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
Tue May 24 13:43:58 2005
@@ -23,12 +23,16 @@
import org.apache.nutch.mapred.*;
/** Partition urls by hostname. */
-public class CrawlDBPartitioner implements Partitioner {
-
- public void configure(JobConf job) {}
+public class PartitionUrlByHost implements Partitioner {
+ private int seed;
+
+ public void configure(JobConf job) {
+ seed = job.getInt("partition.url.by.host.seed", 0);
+ }
/** Hash by hostname. */
- public int getPartition(WritableComparable key, int numReduceTasks) {
+ public int getPartition(WritableComparable key, Writable value,
+ int numReduceTasks) {
String urlString = ((UTF8)key).toString();
URL url = null;
try {
@@ -36,6 +40,9 @@
} catch (MalformedURLException e) {
}
int hashCode = (url==null ? urlString : url.getHost()).hashCode();
+
+ // make hosts wind up in different partitions on different runs
+ hashCode ^= seed;
return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
Tue May 24 13:43:58 2005
@@ -75,7 +75,8 @@
public synchronized void collect(WritableComparable key,
Writable value)
throws IOException {
- outs[partitioner.getPartition(key, partitions)].append(key, value);
+ outs[partitioner.getPartition(key, value, partitions)]
+ .append(key, value);
}
};
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
Tue May 24 13:43:58 2005
@@ -21,12 +21,13 @@
/** Partitions the key space. A partition is created for each reduce task. */
public interface Partitioner extends Configurable {
- /** Returns the paritition number for a given key given the total number of
+ /** Returns the paritition number for a given entry given the total number of
* partitions. Typically a hash function on a all or a subset of the key.
*
- * @param key the key
+ * @param key the entry key
+ * @param value the entry value
* @param numPartitions the number of partitions
* @return the partition number
*/
- int getPartition(WritableComparable key, int numPartitions);
+ int getPartition(WritableComparable key, Writable value, int numPartitions);
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/HashPartitioner.java
Tue May 24 13:43:58 2005
@@ -20,6 +20,7 @@
import org.apache.nutch.mapred.JobConf;
import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.Writable;
/** Partition keys by their [EMAIL PROTECTED] Object#hashCode()}. */
public class HashPartitioner implements Partitioner {
@@ -27,7 +28,8 @@
public void configure(JobConf job) {}
/** Use [EMAIL PROTECTED] Object#hashCode()} to partition. */
- public int getPartition(WritableComparable key, int numReduceTasks) {
+ public int getPartition(WritableComparable key, Writable value,
+ int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java?rev=178278&r1=178277&r2=178278&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/util/NutchConf.java
Tue May 24 13:43:58 2005
@@ -138,6 +138,11 @@
}
}
+ /** Sets the value of the <code>name</code> property to a long. */
+ public void setLong(String name, long value) {
+ set(name, Long.toString(value));
+ }
+
/** Returns the value of the <code>name</code> property as a float. If no
* such property is specified, or if the specified value is not a valid
* float, then <code>defaultValue</code> is returned.
@@ -165,6 +170,11 @@
else if ("false".equals(valueString))
return false;
else return defaultValue;
+ }
+
+ /** Sets the value of the <code>name</code> property to an integer. */
+ public void setBoolean(String name, boolean value) {
+ set(name, Boolean.toString(value));
}
/** Returns the value of the <code>name</code> property as an array of
-------------------------------------------------------
This SF.Net email is sponsored by Yahoo.
Introducing Yahoo! Search Developer Network - Create apps using Yahoo!
Search APIs Find out how you can build Yahoo! directly into your own
Applications - visit http://developer.yahoo.net/?fr=offad-ysdn-ostg-q22005
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs