Author: cutting
Date: Thu May 26 15:00:07 2005
New Revision: 178695
URL: http://svn.apache.org/viewcvs?rev=178695&view=rev
Log:
First version of MapReduced-based fetcher.
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConfigurable.java
- copied, changed from r178047,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Configurable.java
Removed:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Configurable.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormats.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormats.java
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/demo/Grep.java
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
Thu May 26 15:00:07 2005
@@ -25,22 +25,80 @@
/** Merge new page entries with existing entries. */
public class CrawlDBReducer implements Reducer {
+ private int retryMax;
- public void configure(JobConf job) {}
+ public void configure(JobConf job) {
+ retryMax = job.getInt("db.fetch.retry.max", 3);
+ }
public void reduce(WritableComparable key, Iterator values,
OutputCollector output) throws IOException {
- // collect datum with the highest status
- CrawlDatum result = null;
+
+ CrawlDatum highest = null;
+ CrawlDatum old = null;
int linkCount = 0;
+
while (values.hasNext()) {
CrawlDatum datum = (CrawlDatum)values.next();
linkCount += datum.getLinkCount(); // sum link counts
- // keep w/ max status
- if (result == null || datum.getStatus() > result.getStatus())
- result = datum;
+
+ if (highest == null || datum.getStatus() > highest.getStatus()) {
+ highest = datum; // find highest status
+ }
+
+ switch (datum.getStatus()) { // find old entry, if any
+ case CrawlDatum.STATUS_DB_UNFETCHED:
+ case CrawlDatum.STATUS_DB_FETCHED:
+ old = datum;
+ }
+ }
+
+ CrawlDatum result = null;
+
+ switch (highest.getStatus()) { // determine new status
+
+ case CrawlDatum.STATUS_DB_UNFETCHED: // no new entry
+ case CrawlDatum.STATUS_DB_FETCHED:
+ case CrawlDatum.STATUS_DB_GONE:
+ result = old; // use old
+ break;
+
+ case CrawlDatum.STATUS_LINKED: // highest was link
+ if (old != null) { // if old exists
+ result = old; // use it
+ } else {
+ result = highest; // use new entry
+ result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+ }
+ break;
+
+ case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
+ result = highest; // use new entry
+ result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
+ break;
+
+ case CrawlDatum.STATUS_FETCH_FAIL_TEMP: // temporary failure
+ result = highest; // use new entry
+ if (highest.getRetriesSinceFetch() < retryMax) {
+ result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+ } else {
+ result.setStatus(CrawlDatum.STATUS_DB_GONE);
+ }
+ break;
+
+ case CrawlDatum.STATUS_FETCH_FAIL_PERM: // permanent failure
+ result = highest; // use new entry
+ result.setStatus(CrawlDatum.STATUS_DB_GONE);
+ break;
+
+ default:
+ throw new RuntimeException("Unknown status: "+highest.getStatus());
+ }
+
+ if (result != null) {
+ result.setLinkCount(linkCount);
+ output.collect(key, result);
}
- result.setLinkCount(linkCount);
- output.collect(key, result);
}
+
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
Thu May 26 15:00:07 2005
@@ -25,14 +25,17 @@
/* The crawl state of a url. */
public class CrawlDatum implements WritableComparable, Cloneable {
- private final static byte CUR_VERSION = 0;
+ public static final String DIR_NAME = "crawl";
- public static final byte STATUS_DB_UNFETCHED = 0;
- public static final byte STATUS_DB_FETCHED = 1;
- public static final byte STATUS_LINKED = 2;
- public static final byte STATUS_FETCHER_SUCCESS = 3;
- public static final byte STATUS_FETCHER_FAIL_TEMP = 4;
- public static final byte STATUS_FETCHER_FAIL_PERM = 5;
+ private final static byte CUR_VERSION = 1;
+
+ public static final byte STATUS_DB_UNFETCHED = 1;
+ public static final byte STATUS_DB_FETCHED = 2;
+ public static final byte STATUS_DB_GONE = 3;
+ public static final byte STATUS_LINKED = 4;
+ public static final byte STATUS_FETCH_SUCCESS = 5;
+ public static final byte STATUS_FETCH_FAIL_TEMP = 6;
+ public static final byte STATUS_FETCH_FAIL_PERM = 7;
private byte status;
private long nextFetch = System.currentTimeMillis();
@@ -54,7 +57,7 @@
//
public byte getStatus() { return status; }
- public void setStatus(byte status) { this.status = (byte)status; }
+ public void setStatus(int status) { this.status = (byte)status; }
public long getNextFetchTime() { return nextFetch; }
public void setNextFetchTime(long nextFetch) { this.nextFetch = nextFetch; }
@@ -73,6 +76,13 @@
//
// writable methods
//
+
+ public static CrawlDatum read(DataInput in) throws IOException {
+ CrawlDatum result = new CrawlDatum();
+ result.readFields(in);
+ return result;
+ }
+
public void readFields(DataInput in) throws IOException {
byte version = in.readByte(); // read version
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=178695&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
Thu May 26 15:00:07 2005
@@ -0,0 +1,219 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.mapred.*;
+
+import java.util.logging.*;
+
+/** The fetcher. Most of the work is done by plugins. */
+public class Fetcher extends NutchConfigured implements MapRunnable {
+
+ public static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher");
+
+ private RecordReader input;
+ private OutputCollector output;
+
+ private int activeThreads;
+
+ private long start = System.currentTimeMillis(); // start time of fetcher run
+
+ private long bytes; // total bytes fetched
+ private int pages; // total pages fetched
+ private int errors; // total pages errored
+
+ private class FetcherThread extends Thread {
+ public void run() {
+ synchronized (Fetcher.this) {activeThreads++;} // count threads
+
+ try {
+ UTF8 key = new UTF8();
+ CrawlDatum datum = new CrawlDatum();
+
+ while (true) {
+ if (LogFormatter.hasLoggedSevere()) // something bad happened
+ break; // exit
+
+ try { // get next entry from input
+ if (!input.next(key, datum))
+ break; // at eof, exit
+ } catch (IOException e) {
+ LOG.severe("fetcher caught:"+e.toString());
+ break;
+ }
+
+ String url = key.toString();
+ try {
+ LOG.info("fetching " + url); // fetch the page
+
+ Protocol protocol = ProtocolFactory.getProtocol(url);
+ Content content = protocol.getContent(url);
+
+ output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS);
+
+ updateStatus(content.getContent().length);
+
+ } catch (ResourceGone e) { // don't retry
+ logError(url, e);
+ output(url, datum, null, CrawlDatum.STATUS_FETCH_FAIL_PERM);
+
+ } catch (Throwable t) { // retry all others
+ logError(url, t);
+ output(url, datum, null, CrawlDatum.STATUS_FETCH_FAIL_TEMP);
+
+ }
+ }
+
+ } catch (Throwable e) {
+ LOG.severe("fetcher caught:"+e.toString());
+ } finally {
+ synchronized (Fetcher.this) {activeThreads--;} // count threads
+ }
+ }
+
+ private void logError(String url, Throwable t) {
+ LOG.info("fetch of " + url + " failed with: " + t);
+ LOG.log(Level.FINE, "stack", t); // stack trace
+ synchronized (Fetcher.this) { // record failure
+ errors++;
+ }
+ }
+
+ private void output(String url, CrawlDatum datum,
+ Content content, int status) {
+ datum.setStatus(status);
+ if (content == null)
+ content = new Content(url, url, new byte[0], "", new Properties());
+ try {
+ output.collect(new UTF8(url), new FetcherOutput(datum, content));
+ } catch (IOException e) {
+ LOG.severe("fetcher caught:"+e.toString());
+ }
+ }
+
+ }
+
+ public Fetcher() { super(null); }
+
+ public Fetcher(NutchConf conf) { super(conf); }
+
+ private synchronized void updateStatus(int bytesInPage) {
+ pages++;
+ bytes += bytesInPage;
+
+ if ((pages % 100) == 0) { // show status every 100pp
+ long elapsed = (System.currentTimeMillis() - start)/1000;
+ LOG.info( "status: "
+ + pages + " pages, "
+ + errors + " errors, "
+ + bytes + " bytes, "
+ + elapsed + " seconds");
+ LOG.info("status: "
+ + ((float)pages)/elapsed+" pages/s, "
+ + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
+ + ((float)bytes)/pages+" bytes/page");
+ }
+ }
+
+ public void configure(JobConf job) {
+ setConf(job);
+ if (job.getBoolean("fetcher.verbose", false)) {
+ LOG.setLevel(Level.FINE);
+ }
+ }
+
+ public void run(RecordReader input, OutputCollector output)
+ throws IOException {
+
+ this.input = input;
+ this.output = output;
+
+ int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+ for (int i = 0; i < threadCount; i++) { // spawn threads
+ new FetcherThread().start();
+ }
+
+ do { // wait for threads to exit
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+
+ } while (activeThreads > 0);
+
+ }
+
+ public void fetch(File inputDir, File outputDir, int threads)
+ throws IOException {
+
+ JobConf job = new JobConf(getConf());
+
+ job.setInt("fetcher.threads.fetch", threads);
+
+ job.setInputDir(inputDir);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(CrawlDatum.class);
+
+ job.setMapRunnerClass(Fetcher.class);
+
+ job.setOutputDir(outputDir);
+ job.setOutputFormat(FetcherOutputFormat.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(FetcherOutput.class);
+ JobClient.runJob(job);
+ }
+
+
+ /** Run the fetcher. */
+ public static void main(String[] args) throws Exception {
+
+ String usage = "Usage: Fetcher <inDir> <outDir> [-threads n]";
+
+ if (args.length < 2) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ File inDir = new File(args[0]);
+ File outDir = new File(args[1]);
+
+ NutchConf conf = NutchConf.get();
+
+ int threads = conf.getInt("fetcher.threads.fetch", 10);
+
+ for (int i = 2; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ }
+ }
+
+ Fetcher fetcher = new Fetcher(conf); // make a Fetcher
+
+ fetcher.fetch(inDir, outDir, threads); // run the Fetcher
+
+ }
+}
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java?rev=178695&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutput.java
Thu May 26 15:00:07 2005
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.protocol.Content;
+
+/* An entry in the fetcher's output. */
+public final class FetcherOutput implements Writable {
+ private CrawlDatum crawlDatum;
+ private Content content;
+
+ public FetcherOutput() {}
+
+ public FetcherOutput(CrawlDatum crawlDatum, Content content) {
+ this.crawlDatum = crawlDatum;
+ this.content = content;
+ }
+
+ public final void readFields(DataInput in) throws IOException {
+ this.crawlDatum = CrawlDatum.read(in);
+ this.content = Content.read(in);
+ }
+
+ public final void write(DataOutput out) throws IOException {
+ crawlDatum.write(out);
+ content.write(out);
+ }
+
+ public CrawlDatum getCrawlDatum() { return crawlDatum; }
+ public Content getContent() { return content; }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof FetcherOutput))
+ return false;
+ FetcherOutput other = (FetcherOutput)o;
+ return
+ this.crawlDatum.equals(other.crawlDatum) &&
+ this.content.equals(other.content);
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("CrawlDatum: " + crawlDatum+"\n" );
+ return buffer.toString();
+ }
+
+}
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java?rev=178695&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
Thu May 26 15:00:07 2005
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.nutch.fs.NutchFileSystem;
+
+import org.apache.nutch.io.MapFile;
+import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.UTF8;
+
+import org.apache.nutch.mapred.OutputFormat;
+import org.apache.nutch.mapred.RecordWriter;
+import org.apache.nutch.mapred.JobConf;
+
+import org.apache.nutch.protocol.Content;
+
+/** Splits FetcherOutput entries into multiple map files. */
+public class FetcherOutputFormat implements OutputFormat {
+
+ public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+ String name) throws IOException {
+
+ File dir = new File(job.getOutputDir(), name);
+
+ final MapFile.Writer crawlOut =
+ new MapFile.Writer(fs, new File(dir, CrawlDatum.DIR_NAME).toString(),
+ UTF8.class, CrawlDatum.class);
+
+ final MapFile.Writer contentOut =
+ new MapFile.Writer(fs, new File(dir, Content.DIR_NAME).toString(),
+ UTF8.class, Content.class);
+
+ return new RecordWriter() {
+
+ public void write(WritableComparable key, Writable value)
+ throws IOException {
+
+ FetcherOutput fo = (FetcherOutput)value;
+
+ crawlOut.append(key, fo.getCrawlDatum());
+ contentOut.append(key, fo.getContent());
+ }
+
+ public void close() throws IOException {
+ crawlOut.close();
+ contentOut.close();
+ }
+
+ };
+
+ }
+}
+
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
Thu May 26 15:00:07 2005
@@ -35,18 +35,15 @@
LogFormatter.getLogger("org.apache.nutch.crawl.Generator");
private File dbDir;
- private boolean refetchOnly;
/** Selects entries due for fetch. */
public static class Selector implements Mapper, Partitioner, Reducer {
private long curTime;
- private boolean refetchOnly;
private long limit;
private long count;
public void configure(JobConf job) {
curTime = job.getLong("crawl.gen.curTime", System.currentTimeMillis());
- refetchOnly = job.getBoolean("crawl.gen.refetchOnly", false);
limit = job.getLong("crawl.gen.limit", Long.MAX_VALUE);
}
@@ -55,12 +52,11 @@
OutputCollector output) throws IOException {
CrawlDatum crawlDatum = (CrawlDatum)value;
- if (crawlDatum.getNextFetchTime() > curTime)
- return;
+ if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
+ return; // don't retry
- if (refetchOnly
- && crawlDatum.getStatus() == CrawlDatum.STATUS_DB_UNFETCHED)
- return;
+ if (crawlDatum.getNextFetchTime() > curTime)
+ return; // not time yet
output.collect(crawlDatum, key); // invert for sort by linkCount
}
@@ -82,6 +78,22 @@
}
+ /** Sort fetch lists by hash of URL. */
+ public static class HashComparator extends WritableComparator {
+ public HashComparator() { super(UTF8.class); }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return hash(b1, s1, l1) - hash(b2, s2, l2);
+ }
+
+ private static int hash(byte[] bytes, int start, int length) {
+ int hash = 1;
+ for (int i = 0; i < length; i++)
+ hash = (31 * hash) + (int)bytes[start+i];
+ return hash;
+ }
+ }
+
/** Construct a generator. */
public Generator(NutchConf conf, File dbDir) {
super(conf);
@@ -89,23 +101,21 @@
}
/** Generate fetchlists. */
- public void generate(File dir, int numLists, long topN, long curTime,
- boolean refetchOnly)
+ public void generate(File dir, int numLists, long topN, long curTime)
throws IOException {
File tempDir =
new File("generate-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- // map to inverted subset due for fetch
+ // map to inverted subset due for fetch, sort by link count
JobConf job = new JobConf(getConf());
job.setLong("crawl.gen.curTime", curTime);
- job.setBoolean("crawl.gen.refetchOnly", refetchOnly);
job.setLong("crawl.gen.limit", topN / job.getNumReduceTasks());
job.setInputDir(new File(dbDir, "current"));
- job.setInputFormat(InputFormats.get("seq"));
+ job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(CrawlDatum.class);
@@ -114,18 +124,18 @@
job.setReducerClass(Selector.class);
job.setOutputDir(tempDir);
- job.setOutputFormat(OutputFormats.get("seq"));
+ job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(CrawlDatum.class);
job.setOutputValueClass(UTF8.class);
JobClient.runJob(job);
- // invert again and paritition by host
+ // invert again, paritition by host, sort by url hash
job = new JobConf(getConf());
job.setInt("partition.url.by.host.seed", new Random().nextInt());
job.setInputDir(tempDir);
- job.setInputFormat(InputFormats.get("seq"));
+ job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(CrawlDatum.class);
job.setInputValueClass(UTF8.class);
@@ -134,9 +144,10 @@
job.setNumReduceTasks(numLists);
job.setOutputDir(dir);
- job.setOutputFormat(OutputFormats.get("seq"));
+ job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(CrawlDatum.class);
+ job.setOutputKeyComparatorClass(HashComparator.class);
JobClient.runJob(job);
new JobClient(getConf()).getFs().delete(tempDir);
@@ -147,21 +158,18 @@
*/
public static void main(String args[]) throws Exception {
if (args.length < 2) {
- System.out.println("Usage: Generator <crawldb> <segments_dir>
[-refetchonly] [-topN N] [-numFetchers numFetchers] [-adddays numDays]");
+ System.out.println("Usage: Generator <crawldb> <segments_dir> [-topN N]
[-numFetchers numFetchers] [-adddays numDays]");
return;
}
File dbDir = new File(args[0]);
File segmentsDir = new File(args[1]);
long curTime = System.currentTimeMillis();
- boolean refetchOnly = false;
long topN = Long.MAX_VALUE;
int numFetchers = 1;
for (int i = 2; i < args.length; i++) {
- if ("-refetchonly".equals(args[i])) {
- refetchOnly = true;
- } else if ("-topN".equals(args[i])) {
+ if ("-topN".equals(args[i])) {
topN = Long.parseLong(args[i+1]);
i++;
} else if ("-numFetchers".equals(args[i])) {
@@ -177,7 +185,7 @@
if (topN != Long.MAX_VALUE)
LOG.info("topN:" + topN);
Generator gen = new Generator(NutchConf.get(), dbDir);
- gen.generate(segmentsDir, numFetchers, topN, curTime, refetchOnly);
+ gen.generate(segmentsDir, numFetchers, topN, curTime);
LOG.info("Generator completed");
}
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
Thu May 26 15:00:07 2005
@@ -77,7 +77,7 @@
sortJob.setMapperClass(InjectMapper.class);
sortJob.setOutputDir(tempDir);
- sortJob.setOutputFormat(OutputFormats.get("seq"));
+ sortJob.setOutputFormat(SequenceFileOutputFormat.class);
sortJob.setOutputKeyClass(UTF8.class);
sortJob.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(sortJob);
@@ -89,7 +89,7 @@
JobConf mergeJob = new JobConf(getConf());
mergeJob.addInputDir(tempDir);
mergeJob.addInputDir(new File(crawlDb, "current/"));
- mergeJob.setInputFormat(InputFormats.get("seq"));
+ mergeJob.setInputFormat(SequenceFileInputFormat.class);
mergeJob.setInputKeyClass(UTF8.class);
mergeJob.setInputValueClass(CrawlDatum.class);
@@ -98,7 +98,7 @@
mergeJob.setReducerClass(CrawlDBReducer.class);
mergeJob.setOutputDir(newCrawlDb);
- mergeJob.setOutputFormat(OutputFormats.get("seq"));
+ mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
mergeJob.setOutputKeyClass(UTF8.class);
mergeJob.setOutputValueClass(CrawlDatum.class);
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
Thu May 26 15:00:07 2005
@@ -42,13 +42,8 @@
this.job = job;
this.out = out;
this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
-
- try {
- this.keyToValues = new TreeMap
- ((Comparator)job.getOutputKeyComparatorClass().newInstance());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ this.keyToValues = new TreeMap
+ ((Comparator)job.newInstance(job.getOutputKeyComparatorClass()));
}
public synchronized void collect(WritableComparable key, Writable value)
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormat.java
Thu May 26 15:00:07 2005
@@ -26,11 +26,6 @@
* Files are processed as sequences of records, implementing [EMAIL PROTECTED]
* RecordReader}. Files must thus be split on record boundaries. */
public interface InputFormat {
-
- /** The name of this input format.
- * @see InputFormats
- */
- String getName();
/** Splits a set of input files. One split is created per map task.
*
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
Thu May 26 15:00:07 2005
@@ -29,8 +29,6 @@
private static final double SPLIT_SLOP = 0.1; // 10% slop
- public abstract String getName();
-
public abstract RecordReader getRecordReader(NutchFileSystem fs,
FileSplit split,
JobConf job) throws IOException;
@@ -42,7 +40,10 @@
File[] dirs = job.getInputDirs();
ArrayList files = new ArrayList();
for (int i = 0; i < dirs.length; i++) {
- files.addAll(Arrays.asList(fs.listFiles(dirs[i])));
+ File[] dir = fs.listFiles(dirs[i]);
+ if (dir != null) {
+ files.addAll(Arrays.asList(dir));
+ }
}
if (files.size() == 0) {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConf.java
Thu May 26 15:00:07 2005
@@ -108,16 +108,20 @@
public void setOutputDir(File dir) { set("mapred.output.dir", dir); }
public InputFormat getInputFormat() {
- return InputFormats.get(get("mapred.input.format", "text"));
+ return (InputFormat)newInstance(getClass("mapred.input.format.class",
+ TextInputFormat.class,
+ InputFormat.class));
}
- public void setInputFormat(InputFormat format) {
- set("mapred.input.format", format.getName());
+ public void setInputFormat(Class theClass) {
+ setClass("mapred.input.format.class", theClass, InputFormat.class);
}
public OutputFormat getOutputFormat() {
- return OutputFormats.get(get("mapred.output.format", "text"));
+ return (OutputFormat)newInstance(getClass("mapred.output.format.class",
+ TextOutputFormat.class,
+ OutputFormat.class));
}
- public void setOutputFormat(OutputFormat format) {
- set("mapred.output.format", format.getName());
+ public void setOutputFormat(Class theClass) {
+ setClass("mapred.output.format.class", theClass, OutputFormat.class);
}
public Class getInputKeyClass() {
@@ -205,14 +209,15 @@
public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 10); }
public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
- public Configurable newInstance(Class theClass) {
- Configurable result;
+ public Object newInstance(Class theClass) {
+ Object result;
try {
- result = (Configurable)theClass.newInstance();
+ result = theClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
- result.configure(this);
+ if (result instanceof JobConfigurable)
+ ((JobConfigurable)result).configure(this);
return result;
}
Copied:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConfigurable.java
(from r178047,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Configurable.java)
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConfigurable.java?p2=incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConfigurable.java&p1=incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Configurable.java&r1=178047&r2=178695&rev=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Configurable.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobConfigurable.java
Thu May 26 15:00:07 2005
@@ -16,13 +16,8 @@
package org.apache.nutch.mapred;
-import java.io.IOException;
-
-import org.apache.nutch.io.Writable;
-import org.apache.nutch.io.WritableComparable;
-
/** That what may be configured. */
-public interface Configurable {
+public interface JobConfigurable {
/** Initializes a new instance from a [EMAIL PROTECTED] JobConf}.
*
* @param job the configuration
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
Thu May 26 15:00:07 2005
@@ -23,7 +23,7 @@
/** Expert: Permits greater control of map processing. For example,
* implementations might perform multi-threaded, asynchronous mappings. */
-public interface MapRunnable extends Configurable {
+public interface MapRunnable extends JobConfigurable {
/** Called to execute mapping. Mapping is complete when this returns.
* @param input the [EMAIL PROTECTED] RecordReader} with input key/value
pairs.
* @param output the [EMAIL PROTECTED] OutputCollector} for mapped key/value
pairs.
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
Thu May 26 15:00:07 2005
@@ -23,28 +23,25 @@
/** Default [EMAIL PROTECTED] MapRunnable} implementation.*/
public class MapRunner implements MapRunnable {
+ private JobConf job;
private Mapper mapper;
private Class inputKeyClass;
private Class inputValueClass;
public void configure(JobConf job) {
- mapper = (Mapper)job.newInstance(job.getMapperClass());
- inputKeyClass = job.getInputKeyClass();
- inputValueClass = job.getInputValueClass();
+ this.job = job;
+ this.mapper = (Mapper)job.newInstance(job.getMapperClass());
+ this.inputKeyClass = job.getInputKeyClass();
+ this.inputValueClass = job.getInputValueClass();
}
public void run(RecordReader input, OutputCollector output)
throws IOException {
while (true) {
// allocate new key & value instances
- WritableComparable key = null;
- Writable value = null;
- try {
- key = (WritableComparable)inputKeyClass.newInstance();
- value = (Writable)inputValueClass.newInstance();
- } catch (Exception e) {
- throw new IOException(e.toString());
- }
+ WritableComparable key =
+ (WritableComparable)job.newInstance(inputKeyClass);
+ Writable value = (Writable)job.newInstance(inputValueClass);
// read next key & value
if (!input.next(key, value))
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
Thu May 26 15:00:07 2005
@@ -25,7 +25,7 @@
* intermediate values associated with a given output key are subsequently
* grouped by the map/reduce system, and passed to a [EMAIL PROTECTED]
Reducer} to
* determine the final output.. */
-public interface Mapper extends Configurable {
+public interface Mapper extends JobConfigurable {
/** Maps a single input key/value pair into intermediate key/value pairs.
* Output pairs need not be of the same types as input pairs. A given input
* pair may map to zero or many output pairs. Output pairs are collected
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/OutputFormat.java
Thu May 26 15:00:07 2005
@@ -24,11 +24,6 @@
/** An output data format. Output files are stored in a [EMAIL PROTECTED]
* NutchFileSystem}. */
public interface OutputFormat {
-
- /** The name of this output format.
- * @see OutputFormats
- */
- String getName();
/** Construct a [EMAIL PROTECTED] RecordWriter}.
*
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Partitioner.java
Thu May 26 15:00:07 2005
@@ -20,7 +20,7 @@
import org.apache.nutch.io.WritableComparable;
/** Partitions the key space. A partition is created for each reduce task. */
-public interface Partitioner extends Configurable {
+public interface Partitioner extends JobConfigurable {
/** Returns the paritition number for a given entry given the total number of
* partitions. Typically a hash function on a all or a subset of the key.
*
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
Thu May 26 15:00:07 2005
@@ -146,14 +146,9 @@
new SequenceFile.Writer(lfs, file, keyClass, valueClass);
try {
// append all input files into a single input file
- WritableComparable key;
- Writable value;
- try {
- key = (WritableComparable)keyClass.newInstance();
- value = (Writable)valueClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ WritableComparable key = (WritableComparable)job.newInstance(keyClass);
+ Writable value = (Writable)job.newInstance(valueClass);
+
for (int i = 0; i < mapTaskIds.length; i++) {
String partFile =
MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
@@ -175,7 +170,7 @@
WritableComparator comparator = null;
try {
comparator =
- (WritableComparator)job.getOutputKeyComparatorClass().newInstance();
+ (WritableComparator)job.newInstance(job.getOutputKeyComparatorClass());
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
Thu May 26 15:00:07 2005
@@ -25,7 +25,7 @@
/** Reduces a set of intermediate values which share a key to a smaller set of
* values. Input values are the grouped output of a [EMAIL PROTECTED]
Mapper}. */
-public interface Reducer extends Configurable {
+public interface Reducer extends JobConfigurable {
/** Combines values for a given key. Output values must be of the same type
* as input values. Input keys must not be altered. Typically all values
* are combined into zero or one value. Output pairs are collected with
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
Thu May 26 15:00:07 2005
@@ -32,8 +32,6 @@
* the position in the file, and values are the line of text.. */
public class SequenceFileInputFormat extends InputFormatBase {
- public String getName() { return "seq"; }
-
public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
JobConf job) throws IOException {
@@ -45,15 +43,19 @@
in.sync(split.getStart()); // sync to start
return new RecordReader() {
+ private boolean more = true;
+
public synchronized boolean next(Writable key, Writable value)
throws IOException {
+ if (!more) return false;
long pos = in.getPosition();
- boolean more = in.next(key, value);
+ boolean eof = in.next(key, value);
if (pos >= end && in.syncSeen()) {
- return false;
+ more = false;
} else {
- return more;
+ more = eof;
}
+ return more;
}
public synchronized long getPos() throws IOException {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileOutputFormat.java
Thu May 26 15:00:07 2005
@@ -26,8 +26,6 @@
import org.apache.nutch.io.Writable;
public class SequenceFileOutputFormat implements OutputFormat {
-
- public String getName() { return "seq"; }
public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
String name) throws IOException {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextInputFormat.java
Thu May 26 15:00:07 2005
@@ -32,8 +32,6 @@
* the position in the file, and values are the line of text.. */
public class TextInputFormat extends InputFormatBase {
- public String getName() { return "text"; }
-
public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
JobConf job) throws IOException {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TextOutputFormat.java
Thu May 26 15:00:07 2005
@@ -26,8 +26,6 @@
import org.apache.nutch.io.Writable;
public class TextOutputFormat implements OutputFormat {
-
- public String getName() { return "text"; }
public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
String name) throws IOException {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/demo/Grep.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/demo/Grep.java?rev=178695&r1=178694&r2=178695&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/demo/Grep.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/demo/Grep.java
Thu May 26 15:00:07 2005
@@ -18,8 +18,8 @@
import org.apache.nutch.mapred.JobConf;
import org.apache.nutch.mapred.JobClient;
import org.apache.nutch.mapred.RunningJob;
-import org.apache.nutch.mapred.InputFormats;
-import org.apache.nutch.mapred.OutputFormats;
+import org.apache.nutch.mapred.SequenceFileOutputFormat;
+import org.apache.nutch.mapred.SequenceFileInputFormat;
import org.apache.nutch.mapred.lib.RegexMapper;
import org.apache.nutch.mapred.lib.InverseMapper;
@@ -66,7 +66,7 @@
grepJob.setNumReduceTasks(6);
grepJob.setOutputDir(tempDir);
- grepJob.setOutputFormat(OutputFormats.get("seq"));
+ grepJob.setOutputFormat(SequenceFileOutputFormat.class);
grepJob.setOutputKeyClass(UTF8.class);
grepJob.setOutputValueClass(LongWritable.class);
@@ -77,7 +77,7 @@
sortJob.setNumMapTasks(6);
sortJob.setInputDir(tempDir);
- sortJob.setInputFormat(InputFormats.get("seq"));
+ sortJob.setInputFormat(SequenceFileInputFormat.class);
sortJob.setInputKeyClass(UTF8.class);
sortJob.setInputValueClass(LongWritable.class);
-------------------------------------------------------
This SF.Net email is sponsored by Yahoo.
Introducing Yahoo! Search Developer Network - Create apps using Yahoo!
Search APIs Find out how you can build Yahoo! directly into your own
Applications - visit http://developer.yahoo.net/?fr=offad-ysdn-ostg-q22005
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs