Author: cutting
Date: Wed Jun 1 12:54:55 2005
New Revision: 179408
URL: http://svn.apache.org/viewcvs?rev=179408&view=rev
Log:
First working version of MapReduce based parse and updatedb. The
MapReduce-based crawl loop is now complete.
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
- copied, changed from r179236,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
Removed:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
Modified:
incubator/nutch/branches/mapred/bin/nutch
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
Modified: incubator/nutch/branches/mapred/bin/nutch
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/bin/nutch?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/bin/nutch (original)
+++ incubator/nutch/branches/mapred/bin/nutch Wed Jun 1 12:54:55 2005
@@ -34,10 +34,10 @@
echo " generate generate new segments to fetch"
echo " fetch fetch a segment's pages"
echo " parse parse a segment's pages"
+ echo " updatedb update crawl db from segments after fetching"
echo " index run the indexer on a segment's fetcher output"
echo " merge merge several segment indexes"
echo " dedup remove duplicates from a set of segment indexes"
- echo " updatedb update db from segments after fetching"
echo " updatesegs update segments with link data from the db"
echo " mergesegs merge multiple segments into a single segment"
echo " readdb examine arbitrary fields of the database"
@@ -135,15 +135,15 @@
elif [ "$COMMAND" = "fetch" ] ; then
CLASS=org.apache.nutch.crawl.Fetcher
elif [ "$COMMAND" = "parse" ] ; then
- CLASS=org.apache.nutch.tools.ParseSegment
+ CLASS=org.apache.nutch.crawl.ParseSegment
+elif [ "$COMMAND" = "updatedb" ] ; then
+ CLASS=org.apache.nutch.crawl.CrawlDb
elif [ "$COMMAND" = "index" ] ; then
CLASS=org.apache.nutch.indexer.IndexSegment
elif [ "$COMMAND" = "merge" ] ; then
CLASS=org.apache.nutch.indexer.IndexMerger
elif [ "$COMMAND" = "dedup" ] ; then
CLASS=org.apache.nutch.indexer.DeleteDuplicates
-elif [ "$COMMAND" = "updatedb" ] ; then
- CLASS=org.apache.nutch.tools.UpdateDatabaseTool
elif [ "$COMMAND" = "updatesegs" ] ; then
CLASS=org.apache.nutch.tools.UpdateSegmentsFromDb
elif [ "$COMMAND" = "mergesegs" ] ; then
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
Wed Jun 1 12:54:55 2005
@@ -25,7 +25,11 @@
/* The crawl state of a url. */
public class CrawlDatum implements WritableComparable, Cloneable {
- public static final String DIR_NAME = "crawl";
+ public static final String DB_DIR_NAME = "current";
+
+ public static final String GENERATE_DIR_NAME = "crawl_generate";
+ public static final String FETCH_DIR_NAME = "crawl_fetch";
+ public static final String PARSE_DIR_NAME = "crawl_parse";
private final static byte CUR_VERSION = 1;
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java?rev=179408&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
Wed Jun 1 12:54:55 2005
@@ -0,0 +1,99 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.net.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.parse.*;
+
+/** This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled. Useful for bootstrapping the system. */
+public class CrawlDb extends NutchConfigured {
+
+ public static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.crawl.CrawlDb");
+
+ /** Construct an CrawlDb. */
+ public CrawlDb(NutchConf conf) {
+ super(conf);
+ }
+
+ public void update(File crawlDb, File segment) throws IOException {
+ JobConf job = CrawlDb.createJob(getConf(), crawlDb);
+ job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
+ job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
+ JobClient.runJob(job);
+ CrawlDb.install(job, crawlDb);
+ }
+
+ public static JobConf createJob(NutchConf config, File crawlDb) {
+ File newCrawlDb =
+ new File(crawlDb,
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new JobConf(config);
+
+ job.setInt("partition.url.by.host.seed", new Random().nextInt());
+ job.setPartitionerClass(PartitionUrlByHost.class);
+
+ job.addInputDir(new File(crawlDb, CrawlDatum.DB_DIR_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(CrawlDatum.class);
+
+ job.setReducerClass(CrawlDbReducer.class);
+
+ job.setOutputDir(newCrawlDb);
+ job.setOutputFormat(MapFileOutputFormat.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(CrawlDatum.class);
+
+ return job;
+ }
+
+ public static void install(JobConf job, File crawlDb) throws IOException {
+ File newCrawlDb = job.getOutputDir();
+ NutchFileSystem fs = new JobClient(job).getFs();
+ File old = new File(crawlDb, "old");
+ File current = new File(crawlDb, CrawlDatum.DB_DIR_NAME);
+ fs.delete(old);
+ fs.rename(current, old);
+ fs.rename(newCrawlDb, current);
+ fs.delete(old);
+ }
+
+ public static void main(String[] args) throws Exception {
+ CrawlDb crawlDb = new CrawlDb(NutchConf.get());
+
+ if (args.length < 2) {
+ System.err.println("Usage: <crawldb> <segment>");
+ return;
+ }
+
+ crawlDb.update(new File(args[0]), new File(args[1]));
+ }
+
+
+
+}
Copied:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
(from r179236,
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java)
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?p2=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java&p1=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java&r1=179236&r2=179408&rev=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
Wed Jun 1 12:54:55 2005
@@ -24,7 +24,7 @@
import org.apache.nutch.mapred.*;
/** Merge new page entries with existing entries. */
-public class CrawlDBReducer implements Reducer {
+public class CrawlDbReducer implements Reducer {
private int retryMax;
public void configure(JobConf job) {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
Wed Jun 1 12:54:55 2005
@@ -173,7 +173,7 @@
job.setInt("fetcher.threads.fetch", threads);
- job.setInputDir(new File(segment, "fetchlist"));
+ job.setInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(CrawlDatum.class);
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
Wed Jun 1 12:54:55 2005
@@ -38,13 +38,13 @@
public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
String name) throws IOException {
- File crawl =
- new File(new File(job.getOutputDir(), CrawlDatum.DIR_NAME), name);
+ File fetch =
+ new File(new File(job.getOutputDir(), CrawlDatum.FETCH_DIR_NAME), name);
File content =
new File(new File(job.getOutputDir(), Content.DIR_NAME), name);
- final MapFile.Writer crawlOut =
- new MapFile.Writer(fs, crawl.toString(), UTF8.class, CrawlDatum.class);
+ final MapFile.Writer fetchOut =
+ new MapFile.Writer(fs, fetch.toString(), UTF8.class, CrawlDatum.class);
final MapFile.Writer contentOut =
new MapFile.Writer(fs, content.toString(), UTF8.class, Content.class);
@@ -56,12 +56,12 @@
FetcherOutput fo = (FetcherOutput)value;
- crawlOut.append(key, fo.getCrawlDatum());
+ fetchOut.append(key, fo.getCrawlDatum());
contentOut.append(key, fo.getContent());
}
public void close() throws IOException {
- crawlOut.close();
+ fetchOut.close();
contentOut.close();
}
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
Wed Jun 1 12:54:55 2005
@@ -109,7 +109,7 @@
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
File segment = new File(segments, getDate());
- File output = new File(segment, "fetchlist");
+ File output = new File(segment, CrawlDatum.GENERATE_DIR_NAME);
// map to inverted subset due for fetch, sort by link count
JobConf job = new JobConf(getConf());
@@ -117,7 +117,7 @@
job.setLong("crawl.gen.curTime", curTime);
job.setLong("crawl.gen.limit", topN / job.getNumReduceTasks());
- job.setInputDir(new File(dbDir, "current"));
+ job.setInputDir(new File(dbDir, CrawlDatum.DB_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(CrawlDatum.class);
@@ -191,7 +191,7 @@
LOG.info("Generator started");
if (topN != Long.MAX_VALUE)
- LOG.info("topN:" + topN);
+ LOG.info("topN: " + topN);
Generator gen = new Generator(NutchConf.get(), dbDir);
gen.generate(segmentsDir, numFetchers, topN, curTime);
LOG.info("Generator completed");
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
Wed Jun 1 12:54:55 2005
@@ -61,6 +61,16 @@
}
}
+ /** Combine multiple new entries for a url. */
+ public static class InjectReducer implements Reducer {
+ public void configure(JobConf job) {}
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output) throws IOException {
+ output.collect(key, (Writable)values.next()); // just collect first value
+ }
+ }
+
/** Construct an Injector. */
public Injector(NutchConf conf) {
super(conf);
@@ -75,6 +85,7 @@
JobConf sortJob = new JobConf(getConf());
sortJob.setInputDir(urlDir);
sortJob.setMapperClass(InjectMapper.class);
+ sortJob.setReducerClass(InjectReducer.class);
sortJob.setOutputDir(tempDir);
sortJob.setOutputFormat(SequenceFileOutputFormat.class);
@@ -83,30 +94,13 @@
JobClient.runJob(sortJob);
// merge with existing crawl db
- File newCrawlDb =
- new File(crawlDb,
- Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- JobConf mergeJob = new JobConf(getConf());
+ JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
mergeJob.addInputDir(tempDir);
- mergeJob.addInputDir(new File(crawlDb, "current/"));
- mergeJob.setInputFormat(SequenceFileInputFormat.class);
- mergeJob.setInputKeyClass(UTF8.class);
- mergeJob.setInputValueClass(CrawlDatum.class);
-
- mergeJob.setReducerClass(CrawlDBReducer.class);
-
- mergeJob.setOutputDir(newCrawlDb);
- mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
- mergeJob.setOutputKeyClass(UTF8.class);
- mergeJob.setOutputValueClass(CrawlDatum.class);
-
JobClient.runJob(mergeJob);
+ CrawlDb.install(mergeJob, crawlDb);
+ // clean up
NutchFileSystem fs = new JobClient(getConf()).getFs();
- fs.delete(new File(crawlDb, "old/"));
- fs.rename(new File(crawlDb, "current/"), new File(crawlDb, "old/"));
- fs.rename(newCrawlDb, new File(crawlDb, "current/"));
- fs.delete(new File(crawlDb, "old/"));
fs.delete(tempDir);
}
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=179408&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
Wed Jun 1 12:54:55 2005
@@ -0,0 +1,150 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+/* Parse content in a segment. */
+public class ParseSegment
+ extends NutchConfigured implements Mapper, Reducer, OutputFormat {
+
+ public static final Logger LOG =
+ LogFormatter.getLogger(Parser.class.getName());
+
+ private float interval;
+
+ public ParseSegment() { super(null); }
+
+ public ParseSegment(NutchConf conf) {
+ super(conf);
+ }
+
+ public void configure(JobConf job) {
+ interval = job.getFloat("db.default.fetch.interval", 30f);
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output) throws IOException {
+ Content content = (Content)value;
+ try {
+ Parser parser = ParserFactory.getParser(content.getContentType(),
+ content.getBaseUrl());
+ Parse parse = parser.getParse(content);
+
+ output.collect(key, new ParseImpl(parse.getText(), parse.getData()));
+
+ } catch (ParseException t) {
+ LOG.warning("Error parsing: "+key+": "+t.toString());
+ }
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output) throws IOException {
+ output.collect(key, (Writable)values.next()); // collect first value
+ }
+
+ public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+ String name) throws IOException {
+ File text =
+ new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name);
+ File data =
+ new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name);
+ File crawl =
+ new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name);
+
+ final MapFile.Writer textOut =
+ new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class);
+
+ final MapFile.Writer dataOut =
+ new MapFile.Writer(fs, data.toString(), UTF8.class, ParseData.class);
+
+ final SequenceFile.Writer crawlOut =
+ new SequenceFile.Writer(fs, crawl.toString(),
+ UTF8.class, CrawlDatum.class);
+
+ return new RecordWriter() {
+
+ public void write(WritableComparable key, Writable value)
+ throws IOException {
+
+ Parse parse = (Parse)value;
+
+ textOut.append(key, new ParseText(parse.getText()));
+ dataOut.append(key, parse.getData());
+
+ // collect outlinks for subsequent db update
+ Outlink[] links = parse.getData().getOutlinks();
+ for (int i = 0; i < links.length; i++) {
+ crawlOut.append(new UTF8(links[i].getToUrl()),
+ new CrawlDatum(CrawlDatum.STATUS_LINKED,
+ interval));
+ }
+ }
+
+ public void close() throws IOException {
+ textOut.close();
+ dataOut.close();
+ crawlOut.close();
+ }
+
+ };
+
+ }
+
+ public void parse(File segment) throws IOException {
+ JobConf job = new JobConf(getConf());
+ job.setInputDir(new File(segment, Content.DIR_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(Content.class);
+ job.setMapperClass(ParseSegment.class);
+ job.setReducerClass(ParseSegment.class);
+
+ job.setOutputDir(segment);
+ job.setOutputFormat(ParseSegment.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(ParseImpl.class);
+
+ JobClient.runJob(job);
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ File segment;
+
+ String usage = "Usage: ParseSegment segment";
+
+ if (args.length == 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ segment = new File(args[0]);
+
+ ParseSegment parseSegment = new ParseSegment(NutchConf.get());
+ parseSegment.parse(segment);
+ }
+}
Added:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java?rev=179408&view=auto
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
(added)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
Wed Jun 1 12:54:55 2005
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.nutch.fs.NutchFileSystem;
+
+import org.apache.nutch.io.MapFile;
+import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.Writable;
+
+public class MapFileOutputFormat implements OutputFormat {
+
+ public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+ String name) throws IOException {
+
+ File file = new File(job.getOutputDir(), name);
+
+ final MapFile.Writer out =
+ new MapFile.Writer(fs, file.toString(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass());
+
+ return new RecordWriter() {
+
+ public void write(WritableComparable key, Writable value)
+ throws IOException {
+
+ out.append(key, value);
+ }
+
+ public void close() throws IOException { out.close(); }
+ };
+ }
+}
+
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
Wed Jun 1 12:54:55 2005
@@ -101,7 +101,10 @@
/// Auxiliary methods
/** Start processing next unique key. */
- public void nextKey() { hasNext = more; }
+ public void nextKey() {
+ while (hasNext) { next(); } // skip any unread
+ hasNext = more;
+ }
/** True iff more keys remain. */
public boolean more() { return more; }
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
Wed Jun 1 12:54:55 2005
@@ -30,6 +30,19 @@
/** An [EMAIL PROTECTED] InputFormat} for [EMAIL PROTECTED] SequenceFile}s. */
public class SequenceFileInputFormat extends InputFormatBase {
+ protected File[] listFiles(NutchFileSystem fs, JobConf job)
+ throws IOException {
+
+ File[] files = super.listFiles(fs, job);
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ if (file.isDirectory()) { // it's a MapFile
+ files[i] = new File(file, "data"); // use the data file
+ }
+ }
+ return files;
+ }
+
public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
JobConf job) throws IOException {
Modified:
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
---
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
(original)
+++
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
Wed Jun 1 12:54:55 2005
@@ -16,19 +16,39 @@
package org.apache.nutch.parse;
+import java.io.*;
+import org.apache.nutch.io.*;
+
+
/** The result of parsing a page's raw content.
* @see Parser#getParse(Content)
*/
-public class ParseImpl implements Parse {
- private String text;
+public class ParseImpl implements Parse, Writable {
+ private ParseText text;
private ParseData data;
+ public ParseImpl() {}
+
public ParseImpl(String text, ParseData data) {
- this.text = text;
+ this.text = new ParseText(text);
this.data = data;
}
- public String getText() { return text; }
+ public String getText() { return text.getText(); }
public ParseData getData() { return data; }
+
+ public final void write(DataOutput out) throws IOException {
+ text.write(out);
+ data.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ text = new ParseText();
+ text.readFields(in);
+
+ data = new ParseData();
+ data.readFields(in);
+ }
+
}
-------------------------------------------------------
This SF.Net email is sponsored by Yahoo.
Introducing Yahoo! Search Developer Network - Create apps using Yahoo!
Search APIs Find out how you can build Yahoo! directly into your own
Applications - visit http://developer.yahoo.net/?fr=offad-ysdn-ostg-q22005
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs