Author: cutting
Date: Wed Jun  1 12:54:55 2005
New Revision: 179408

URL: http://svn.apache.org/viewcvs?rev=179408&view=rev
Log:
First working version of MapReduce based parse and updatedb.  The 
MapReduce-based crawl loop is now complete.

Added:
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
      - copied, changed from r179236, 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
Removed:
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
Modified:
    incubator/nutch/branches/mapred/bin/nutch
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
    incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
    
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java

Modified: incubator/nutch/branches/mapred/bin/nutch
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/bin/nutch?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- incubator/nutch/branches/mapred/bin/nutch (original)
+++ incubator/nutch/branches/mapred/bin/nutch Wed Jun  1 12:54:55 2005
@@ -34,10 +34,10 @@
   echo "  generate          generate new segments to fetch"
   echo "  fetch             fetch a segment's pages"
   echo "  parse             parse a segment's pages"
+  echo "  updatedb          update crawl db from segments after fetching"
   echo "  index             run the indexer on a segment's fetcher output"
   echo "  merge             merge several segment indexes"
   echo "  dedup             remove duplicates from a set of segment indexes"
-  echo "  updatedb          update db from segments after fetching"
   echo "  updatesegs        update segments with link data from the db"
   echo "  mergesegs         merge multiple segments into a single segment"
   echo "  readdb            examine arbitrary fields of the database"
@@ -135,15 +135,15 @@
 elif [ "$COMMAND" = "fetch" ] ; then
   CLASS=org.apache.nutch.crawl.Fetcher
 elif [ "$COMMAND" = "parse" ] ; then
-  CLASS=org.apache.nutch.tools.ParseSegment
+  CLASS=org.apache.nutch.crawl.ParseSegment
+elif [ "$COMMAND" = "updatedb" ] ; then
+  CLASS=org.apache.nutch.crawl.CrawlDb
 elif [ "$COMMAND" = "index" ] ; then
   CLASS=org.apache.nutch.indexer.IndexSegment
 elif [ "$COMMAND" = "merge" ] ; then
   CLASS=org.apache.nutch.indexer.IndexMerger
 elif [ "$COMMAND" = "dedup" ] ; then
   CLASS=org.apache.nutch.indexer.DeleteDuplicates
-elif [ "$COMMAND" = "updatedb" ] ; then
-  CLASS=org.apache.nutch.tools.UpdateDatabaseTool
 elif [ "$COMMAND" = "updatesegs" ] ; then
   CLASS=org.apache.nutch.tools.UpdateSegmentsFromDb
 elif [ "$COMMAND" = "mergesegs" ] ; then

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java 
Wed Jun  1 12:54:55 2005
@@ -25,7 +25,11 @@
 
 /* The crawl state of a url. */
 public class CrawlDatum implements WritableComparable, Cloneable {
-  public static final String DIR_NAME = "crawl";
+  public static final String DB_DIR_NAME = "current";
+
+  public static final String GENERATE_DIR_NAME = "crawl_generate";
+  public static final String FETCH_DIR_NAME = "crawl_fetch";
+  public static final String PARSE_DIR_NAME = "crawl_parse";
 
   private final static byte CUR_VERSION = 1;
 

Added: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java?rev=179408&view=auto
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java 
(added)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDb.java 
Wed Jun  1 12:54:55 2005
@@ -0,0 +1,99 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.net.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.parse.*;
+
+/** This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled.  Useful for bootstrapping the system. */
+public class CrawlDb extends NutchConfigured {
+
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.crawl.CrawlDb");
+
+  /** Construct an CrawlDb. */
+  public CrawlDb(NutchConf conf) {
+    super(conf);
+  }
+
+  public void update(File crawlDb, File segment) throws IOException {
+    JobConf job = CrawlDb.createJob(getConf(), crawlDb);
+    job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
+    job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
+    JobClient.runJob(job);
+    CrawlDb.install(job, crawlDb);
+  }
+
+  public static JobConf createJob(NutchConf config, File crawlDb) {
+    File newCrawlDb =
+      new File(crawlDb,
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new JobConf(config);
+
+    job.setInt("partition.url.by.host.seed", new Random().nextInt());
+    job.setPartitionerClass(PartitionUrlByHost.class);
+
+    job.addInputDir(new File(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(CrawlDatum.class);
+
+    job.setReducerClass(CrawlDbReducer.class);
+
+    job.setOutputDir(newCrawlDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    return job;
+  }
+
+  public static void install(JobConf job, File crawlDb) throws IOException {
+    File newCrawlDb = job.getOutputDir();
+    NutchFileSystem fs = new JobClient(job).getFs();
+    File old = new File(crawlDb, "old");
+    File current = new File(crawlDb, CrawlDatum.DB_DIR_NAME);
+    fs.delete(old);
+    fs.rename(current, old);
+    fs.rename(newCrawlDb, current);
+    fs.delete(old);
+  }
+
+  public static void main(String[] args) throws Exception {
+    CrawlDb crawlDb = new CrawlDb(NutchConf.get());
+    
+    if (args.length < 2) {
+      System.err.println("Usage: <crawldb> <segment>");
+      return;
+    }
+    
+    crawlDb.update(new File(args[0]), new File(args[1]));
+  }
+
+
+
+}

Copied: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
 (from r179236, 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java)
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?p2=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java&p1=incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java&r1=179236&r2=179408&rev=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDBReducer.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
 Wed Jun  1 12:54:55 2005
@@ -24,7 +24,7 @@
 import org.apache.nutch.mapred.*;
 
 /** Merge new page entries with existing entries. */
-public class CrawlDBReducer implements Reducer {
+public class CrawlDbReducer implements Reducer {
   private int retryMax;
 
   public void configure(JobConf job) {

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java 
Wed Jun  1 12:54:55 2005
@@ -173,7 +173,7 @@
 
     job.setInt("fetcher.threads.fetch", threads);
 
-    job.setInputDir(new File(segment, "fetchlist"));
+    job.setInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setInputKeyClass(UTF8.class);
     job.setInputValueClass(CrawlDatum.class);

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/FetcherOutputFormat.java
 Wed Jun  1 12:54:55 2005
@@ -38,13 +38,13 @@
   public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
                                       String name) throws IOException {
 
-    File crawl =
-      new File(new File(job.getOutputDir(), CrawlDatum.DIR_NAME), name);
+    File fetch =
+      new File(new File(job.getOutputDir(), CrawlDatum.FETCH_DIR_NAME), name);
     File content =
       new File(new File(job.getOutputDir(), Content.DIR_NAME), name);
 
-    final MapFile.Writer crawlOut =
-      new MapFile.Writer(fs, crawl.toString(), UTF8.class, CrawlDatum.class);
+    final MapFile.Writer fetchOut =
+      new MapFile.Writer(fs, fetch.toString(), UTF8.class, CrawlDatum.class);
     
     final MapFile.Writer contentOut =
       new MapFile.Writer(fs, content.toString(), UTF8.class, Content.class);
@@ -56,12 +56,12 @@
 
           FetcherOutput fo = (FetcherOutput)value;
           
-          crawlOut.append(key, fo.getCrawlDatum());
+          fetchOut.append(key, fo.getCrawlDatum());
           contentOut.append(key, fo.getContent());
         }
 
         public void close() throws IOException {
-          crawlOut.close();
+          fetchOut.close();
           contentOut.close();
         }
 

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java 
Wed Jun  1 12:54:55 2005
@@ -109,7 +109,7 @@
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     File segment = new File(segments, getDate());
-    File output = new File(segment, "fetchlist");
+    File output = new File(segment, CrawlDatum.GENERATE_DIR_NAME);
 
     // map to inverted subset due for fetch, sort by link count
     JobConf job = new JobConf(getConf());
@@ -117,7 +117,7 @@
     job.setLong("crawl.gen.curTime", curTime);
     job.setLong("crawl.gen.limit", topN / job.getNumReduceTasks());
 
-    job.setInputDir(new File(dbDir, "current"));
+    job.setInputDir(new File(dbDir, CrawlDatum.DB_DIR_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setInputKeyClass(UTF8.class);
     job.setInputValueClass(CrawlDatum.class);
@@ -191,7 +191,7 @@
 
     LOG.info("Generator started");
     if (topN != Long.MAX_VALUE)
-      LOG.info("topN:" + topN);
+      LOG.info("topN: " + topN);
     Generator gen = new Generator(NutchConf.get(), dbDir);
     gen.generate(segmentsDir, numFetchers, topN, curTime);
     LOG.info("Generator completed");

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java 
Wed Jun  1 12:54:55 2005
@@ -61,6 +61,16 @@
     }
   }
 
+  /** Combine multiple new entries for a url. */
+  public static class InjectReducer implements Reducer {
+    public void configure(JobConf job) {}
+
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector output) throws IOException {
+      output.collect(key, (Writable)values.next()); // just collect first value
+    }
+  }
+
   /** Construct an Injector. */
   public Injector(NutchConf conf) {
     super(conf);
@@ -75,6 +85,7 @@
     JobConf sortJob = new JobConf(getConf());
     sortJob.setInputDir(urlDir);
     sortJob.setMapperClass(InjectMapper.class);
+    sortJob.setReducerClass(InjectReducer.class);
 
     sortJob.setOutputDir(tempDir);
     sortJob.setOutputFormat(SequenceFileOutputFormat.class);
@@ -83,30 +94,13 @@
     JobClient.runJob(sortJob);
 
     // merge with existing crawl db
-    File newCrawlDb =
-      new File(crawlDb,
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-    JobConf mergeJob = new JobConf(getConf());
+    JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
     mergeJob.addInputDir(tempDir);
-    mergeJob.addInputDir(new File(crawlDb, "current/"));
-    mergeJob.setInputFormat(SequenceFileInputFormat.class);
-    mergeJob.setInputKeyClass(UTF8.class);
-    mergeJob.setInputValueClass(CrawlDatum.class);
-
-    mergeJob.setReducerClass(CrawlDBReducer.class);
-
-    mergeJob.setOutputDir(newCrawlDb);
-    mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
-    mergeJob.setOutputKeyClass(UTF8.class);
-    mergeJob.setOutputValueClass(CrawlDatum.class);
-
     JobClient.runJob(mergeJob);
+    CrawlDb.install(mergeJob, crawlDb);
 
+    // clean up
     NutchFileSystem fs = new JobClient(getConf()).getFs();
-    fs.delete(new File(crawlDb, "old/"));
-    fs.rename(new File(crawlDb, "current/"), new File(crawlDb, "old/"));
-    fs.rename(newCrawlDb, new File(crawlDb, "current/"));
-    fs.delete(new File(crawlDb, "old/"));
     fs.delete(tempDir);
 
   }

Added: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=179408&view=auto
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
 (added)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
 Wed Jun  1 12:54:55 2005
@@ -0,0 +1,150 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+/* Parse content in a segment. */
+public class ParseSegment 
+  extends NutchConfigured implements Mapper, Reducer, OutputFormat {
+
+  public static final Logger LOG =
+    LogFormatter.getLogger(Parser.class.getName());
+
+  private float interval;
+
+  public ParseSegment() { super(null); }
+
+  public ParseSegment(NutchConf conf) {
+    super(conf);
+  }
+
+  public void configure(JobConf job) {
+    interval = job.getFloat("db.default.fetch.interval", 30f);
+  }
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector output) throws IOException {
+    Content content = (Content)value;
+    try {
+      Parser parser = ParserFactory.getParser(content.getContentType(),
+                                              content.getBaseUrl());
+      Parse parse = parser.getParse(content);
+      
+      output.collect(key, new ParseImpl(parse.getText(), parse.getData()));
+      
+    } catch (ParseException t) {
+      LOG.warning("Error parsing: "+key+": "+t.toString());
+    }
+  }
+
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output) throws IOException {
+    output.collect(key, (Writable)values.next()); // collect first value
+  }
+
+  public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+                                      String name) throws IOException {
+    File text =
+      new File(new File(job.getOutputDir(), ParseText.DIR_NAME), name);
+    File data =
+      new File(new File(job.getOutputDir(), ParseData.DIR_NAME), name);
+    File crawl =
+      new File(new File(job.getOutputDir(), CrawlDatum.PARSE_DIR_NAME), name);
+    
+    final MapFile.Writer textOut =
+      new MapFile.Writer(fs, text.toString(), UTF8.class, ParseText.class);
+    
+    final MapFile.Writer dataOut =
+      new MapFile.Writer(fs, data.toString(), UTF8.class, ParseData.class);
+    
+    final SequenceFile.Writer crawlOut =
+      new SequenceFile.Writer(fs, crawl.toString(),
+                              UTF8.class, CrawlDatum.class);
+    
+    return new RecordWriter() {
+        
+        public void write(WritableComparable key, Writable value)
+          throws IOException {
+          
+          Parse parse = (Parse)value;
+          
+          textOut.append(key, new ParseText(parse.getText()));
+          dataOut.append(key, parse.getData());
+
+          // collect outlinks for subsequent db update
+          Outlink[] links = parse.getData().getOutlinks();
+          for (int i = 0; i < links.length; i++) {
+            crawlOut.append(new UTF8(links[i].getToUrl()),
+                            new CrawlDatum(CrawlDatum.STATUS_LINKED,
+                                           interval));
+          }
+        }
+        
+        public void close() throws IOException {
+          textOut.close();
+          dataOut.close();
+          crawlOut.close();
+        }
+        
+      };
+    
+  }      
+
+  public void parse(File segment) throws IOException {
+    JobConf job = new JobConf(getConf());
+    job.setInputDir(new File(segment, Content.DIR_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(Content.class);
+    job.setMapperClass(ParseSegment.class);
+    job.setReducerClass(ParseSegment.class);
+    
+    job.setOutputDir(segment);
+    job.setOutputFormat(ParseSegment.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(ParseImpl.class);
+
+    JobClient.runJob(job);
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    File segment;
+
+    String usage = "Usage: ParseSegment segment";
+
+    if (args.length == 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+      
+    segment = new File(args[0]);
+
+    ParseSegment parseSegment = new ParseSegment(NutchConf.get());
+    parseSegment.parse(segment);
+  }
+}

Added: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java?rev=179408&view=auto
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
 (added)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapFileOutputFormat.java
 Wed Jun  1 12:54:55 2005
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.nutch.fs.NutchFileSystem;
+
+import org.apache.nutch.io.MapFile;
+import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.Writable;
+
+public class MapFileOutputFormat implements OutputFormat {
+
+  public RecordWriter getRecordWriter(NutchFileSystem fs, JobConf job,
+                                      String name) throws IOException {
+
+    File file = new File(job.getOutputDir(), name);
+
+    final MapFile.Writer out =
+      new MapFile.Writer(fs, file.toString(),
+                         job.getOutputKeyClass(),
+                         job.getOutputValueClass());
+
+    return new RecordWriter() {
+
+        public void write(WritableComparable key, Writable value)
+          throws IOException {
+
+          out.append(key, value);
+        }
+
+        public void close() throws IOException { out.close(); }
+      };
+  }      
+}
+

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
 Wed Jun  1 12:54:55 2005
@@ -101,7 +101,10 @@
     /// Auxiliary methods
 
     /** Start processing next unique key. */
-    public void nextKey() { hasNext = more; }
+    public void nextKey() {
+      while (hasNext) { next(); }                 // skip any unread
+      hasNext = more;
+    }
 
     /** True iff more keys remain. */
     public boolean more() { return more; }

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
 (original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
 Wed Jun  1 12:54:55 2005
@@ -30,6 +30,19 @@
 /** An [EMAIL PROTECTED] InputFormat} for [EMAIL PROTECTED] SequenceFile}s. */
 public class SequenceFileInputFormat extends InputFormatBase {
 
+  protected File[] listFiles(NutchFileSystem fs, JobConf job)
+    throws IOException {
+
+    File[] files = super.listFiles(fs, job);
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      if (file.isDirectory()) {                   // it's a MapFile
+        files[i] = new File(file, "data");        // use the data file
+      }
+    }
+    return files;
+  }
+
   public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
                                       JobConf job) throws IOException {
 

Modified: 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
URL: 
http://svn.apache.org/viewcvs/incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=179408&r1=179407&r2=179408&view=diff
==============================================================================
--- 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java 
(original)
+++ 
incubator/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java 
Wed Jun  1 12:54:55 2005
@@ -16,19 +16,39 @@
 
 package org.apache.nutch.parse;
 
+import java.io.*;
+import org.apache.nutch.io.*;
+
+
 /** The result of parsing a page's raw content.
  * @see Parser#getParse(Content)
  */
-public class ParseImpl implements Parse {
-  private String text;
+public class ParseImpl implements Parse, Writable {
+  private ParseText text;
   private ParseData data;
 
+  public ParseImpl() {}
+
   public ParseImpl(String text, ParseData data) {
-    this.text = text;
+    this.text = new ParseText(text);
     this.data = data;
   }
 
-  public String getText() { return text; }
+  public String getText() { return text.getText(); }
 
   public ParseData getData() { return data; }
+  
+  public final void write(DataOutput out) throws IOException {
+    text.write(out);
+    data.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    text = new ParseText();
+    text.readFields(in);
+
+    data = new ParseData();
+    data.readFields(in);
+  }
+
 }




-------------------------------------------------------
This SF.Net email is sponsored by Yahoo.
Introducing Yahoo! Search Developer Network - Create apps using Yahoo!
Search APIs Find out how you can build Yahoo! directly into your own
Applications - visit http://developer.yahoo.net/?fr=offad-ysdn-ostg-q22005
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to