Author: ab
Date: Mon May  8 14:58:18 2006
New Revision: 405183

URL: http://svn.apache.org/viewcvs?rev=405183&view=rev
Log:
Add the following tools (see also NUTCH-264):

* CrawlDbMerger: merges one or more crawldb-s, with optional filtering

* LinkDbMerger: merges one or more linkdb-s, with optional filtering

* SegmentMerger: merges one or more segments, with optional filtering
  and slicing

Development of these tools has been sponsored by houxou.com - thank you! 

Added:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java   
(with props)
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java   
(with props)
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java   
(with props)
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java   
(with props)
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java   
(with props)
Modified:
    lucene/nutch/trunk/bin/nutch
    lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java
    lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java

Modified: lucene/nutch/trunk/bin/nutch
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/bin/nutch?rev=405183&r1=405182&r2=405183&view=diff
==============================================================================
--- lucene/nutch/trunk/bin/nutch (original)
+++ lucene/nutch/trunk/bin/nutch Mon May  8 14:58:18 2006
@@ -30,14 +30,17 @@
   echo "where COMMAND is one of:"
   echo "  crawl             one-step crawler for intranets"
   echo "  readdb            read / dump crawl db"
+  echo "  mergedb           merge crawldb-s, with optional filtering"
   echo "  readlinkdb        read / dump link db"
   echo "  inject            inject new urls into the database"
   echo "  generate          generate new segments to fetch"
   echo "  fetch             fetch a segment's pages"
   echo "  parse             parse a segment's pages"
   echo "  segread           read / dump segment data"
+  echo "  mergesegs         merge several segments, with optional filtering 
and slicing"
   echo "  updatedb          update crawl db from segments after fetching"
   echo "  invertlinks       create a linkdb from parsed segments"
+  echo "  mergelinkdb       merge linkdb-s, with optional filtering"
   echo "  index             run the indexer on parsed segments and linkdb"
   echo "  merge             merge several segment indexes"
   echo "  dedup             remove duplicates from a set of segment indexes"
@@ -131,14 +134,20 @@
   CLASS=org.apache.nutch.parse.ParseSegment
 elif [ "$COMMAND" = "readdb" ] ; then
   CLASS=org.apache.nutch.crawl.CrawlDbReader
+elif [ "$COMMAND" = "mergedb" ] ; then
+  CLASS=org.apache.nutch.crawl.CrawlDbMerger
 elif [ "$COMMAND" = "readlinkdb" ] ; then
   CLASS=org.apache.nutch.crawl.LinkDbReader
 elif [ "$COMMAND" = "segread" ] ; then
   CLASS=org.apache.nutch.segment.SegmentReader
+elif [ "$COMMAND" = "mergesegs" ] ; then
+  CLASS=org.apache.nutch.segment.SegmentMerger
 elif [ "$COMMAND" = "updatedb" ] ; then
   CLASS=org.apache.nutch.crawl.CrawlDb
 elif [ "$COMMAND" = "invertlinks" ] ; then
   CLASS=org.apache.nutch.crawl.LinkDb
+elif [ "$COMMAND" = "mergelinkdb" ] ; then
+  CLASS=org.apache.nutch.crawl.LinkDbMerger
 elif [ "$COMMAND" = "index" ] ; then
   CLASS=org.apache.nutch.indexer.Indexer
 elif [ "$COMMAND" = "dedup" ] ; then

Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java?rev=405183&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java 
(added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java Mon 
May  8 14:58:18 2006
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * This tool merges several CrawlDb-s into one, optionally filtering
+ * URLs through the current URLFilters, to skip prohibited
+ * pages.
+ * 
+ * <p>It's possible to use this tool just for filtering - in that case
+ * only one CrawlDb should be specified in arguments.</p>
+ * <p>If more than one CrawlDb contains information about the same URL,
+ * only the most recent version is retained, as determined by the
+ * value of [EMAIL PROTECTED] 
org.apache.nutch.crawl.CrawlDatum#getFetchTime()}.
+ * However, all metadata information from all versions is accumulated,
+ * with newer values taking precedence over older values.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class CrawlDbMerger extends Configured {
+  private static final Logger LOG = 
Logger.getLogger(CrawlDbMerger.class.getName());
+
+  public static class Merger extends MapReduceBase implements Reducer {
+    private URLFilters filters = null;
+    MapWritable meta = new MapWritable();
+
+    public void close() throws IOException {}
+
+    public void configure(JobConf conf) {
+      if (conf.getBoolean("crawldb.merger.urlfilters", false))
+        filters = new URLFilters(conf);
+    }
+
+    public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter)
+            throws IOException {
+      if (filters != null) {
+        try {
+          if (filters.filter(((UTF8) key).toString()) == null)
+            return;
+        } catch (Exception e) {
+          LOG.fine("Can't filter " + key + ": " + e);
+        }
+      }
+      CrawlDatum res = null;
+      long resTime = 0L;
+      meta.clear();
+      while (values.hasNext()) {
+        CrawlDatum val = (CrawlDatum) values.next();
+        if (res == null) {
+          res = val;
+          resTime = res.getFetchTime() - Math.round(res.getFetchInterval() * 
3600 * 24 * 1000);
+          meta.putAll(res.getMetaData());
+          continue;
+        }
+        // compute last fetch time, and pick the latest
+        long valTime = val.getFetchTime() - Math.round(val.getFetchInterval() 
* 3600 * 24 * 1000);
+        if (valTime > resTime) {
+          // collect all metadata, newer values override older values
+          meta.putAll(val.getMetaData());
+          res = val;
+          resTime = res.getFetchTime() - Math.round(res.getFetchInterval() * 
3600 * 24 * 1000);
+        } else {
+          // insert older metadata before newer
+          val.getMetaData().putAll(meta);
+          meta = val.getMetaData();
+        }
+      }
+      res.setMetaData(meta);
+      output.collect(key, res);
+    }
+  }
+
+  public CrawlDbMerger(Configuration conf) {
+    super(conf);
+  }
+
+  public void merge(File output, File[] dbs, boolean filter) throws Exception {
+    JobConf job = createMergeJob(getConf(), output);
+    job.setBoolean("crawldb.merger.urlfilters", filter);
+    for (int i = 0; i < dbs.length; i++) {
+      job.addInputDir(new File(dbs[i], CrawlDatum.DB_DIR_NAME));
+    }
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(getConf());
+    fs.mkdirs(output);
+    fs.rename(job.getOutputDir(), new File(output, CrawlDatum.DB_DIR_NAME));
+  }
+
+  public static JobConf createMergeJob(Configuration conf, File output) {
+    File newCrawlDb = new File("crawldb-merge-" + Integer.toString(new 
Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(conf);
+    job.setJobName("crawldb merge " + output);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(CrawlDatum.class);
+
+    job.setReducerClass(Merger.class);
+
+    job.setOutputDir(newCrawlDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    return job;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("CrawlDbMerger output_crawldb crawldb1 [crawldb2 
crawldb3 ...] [-filter]");
+      System.err.println("\toutput_crawldb\toutput CrawlDb");
+      System.err.println("\tcrawldb1 ...\tinput CrawlDb-s");
+      System.err.println("\t-filter\tuse URLFilters on urls in the 
crawldb(s)");
+      return;
+    }
+    Configuration conf = NutchConfiguration.create();
+    File output = new File(args[0]);
+    ArrayList dbs = new ArrayList();
+    boolean filter = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-filter")) {
+        filter = true;
+        continue;
+      }
+      dbs.add(new File(args[i]));
+    }
+    CrawlDbMerger merger = new CrawlDbMerger(conf);
+    merger.merge(output, (File[]) dbs.toArray(new File[dbs.size()]), filter);
+  }
+}

Propchange: 
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java?rev=405183&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Mon 
May  8 14:58:18 2006
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * This tool merges several LinkDb-s into one, optionally filtering
+ * URLs through the current URLFilters, to skip prohibited URLs and
+ * links.
+ * 
+ * <p>It's possible to use this tool just for filtering - in that case
+ * only one LinkDb should be specified in arguments.</p>
+ * <p>If more than one LinkDb contains information about the same URL,
+ * all inlinks are accumulated, but only at most <code>db.max.inlinks</code>
+ * inlinks will ever be added.</p>
+ * <p>If activated, URLFilters will be applied to both the target URLs and
+ * to any incoming link URL. If a target URL is prohibited, all
+ * inlinks to that target will be removed, including the target URL. If
+ * some of incoming links are prohibited, only they will be removed, and they
+ * won't count when checking the above-mentioned maximum limit.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class LinkDbMerger extends Configured {
+
+  public LinkDbMerger(Configuration conf) {
+    super(conf);
+  }
+  
+  public void merge(File output, File[] dbs, boolean filter) throws Exception {
+    JobConf job = LinkDb.createMergeJob(getConf(), output);
+    job.setBoolean("linkdb.merger.urlfilters", filter);
+    for (int i = 0; i < dbs.length; i++) {
+      job.addInputDir(new File(dbs[i], LinkDb.CURRENT_NAME));      
+    }
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(getConf());
+    fs.mkdirs(output);
+    fs.rename(job.getOutputDir(), new File(output, LinkDb.CURRENT_NAME));
+  }
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("LinkDbMerger output_linkdb linkdb1 [linkdb2 linkdb3 
...] [-filter]");
+      System.err.println("\toutput_linkdb\toutput LinkDb");
+      System.err.println("\tlinkdb1 ...\tinput LinkDb-s");
+      System.err.println("\t-filter\tuse URLFilters on both fromUrls and 
toUrls in linkdb(s)");
+      return;
+    }
+    Configuration conf = NutchConfiguration.create();
+    File output = new File(args[0]);
+    ArrayList dbs = new ArrayList();
+    boolean filter = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-filter")) {
+        filter = true;
+        continue;
+      }
+      dbs.add(new File(args[i]));
+    }
+    LinkDbMerger merger = new LinkDbMerger(conf);
+    merger.merge(output, (File[])dbs.toArray(new File[dbs.size()]), filter);
+  }
+
+}

Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java?rev=405183&r1=405182&r2=405183&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java Mon May  
8 14:58:18 2006
@@ -87,6 +87,10 @@
    */
   public Metadata getParseMeta() { return parseMeta; }
   
+  public void setParseMeta(Metadata parseMeta) {
+    this.parseMeta = parseMeta;
+  }
+  
   /**
    * Get a metadata single value.
    * This method first looks for the metadata value in the parse metadata. If 
no

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java?rev=405183&r1=405182&r2=405183&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Mon May  
8 14:58:18 2006
@@ -146,6 +146,12 @@
     return metadata;
   }
 
+  /** Other protocol-specific data. */
+  public void setMetadata(Metadata metadata) {
+    ensureInflated();
+    this.metadata = metadata;
+  }
+
   public boolean equals(Object o) {
     ensureInflated();
     if (!(o instanceof Content)){

Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=405183&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java 
(added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Mon 
May  8 14:58:18 2006
@@ -0,0 +1,647 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.segment;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.*;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * This tool takes several segments and merges their data together. Only the
+ * latest versions of data is retained.
+ * <p>
+ * Optionally, you can apply current URLFilters to remove prohibited URL-s.
+ * </p>
+ * <p>
+ * Also, it's possible to slice the resulting segment into chunks of fixed 
size.
+ * </p>
+ * <h3>Important Notes</h3>
+ * <h4>Which parts are merged?</h4>
+ * <p>It doesn't make sense to merge data from segments, which are at 
different stages
+ * of processing (e.g. one unfetched segment, one fetched but not parsed, and
+ * one fetched and parsed). Therefore, prior to merging, the tool will 
determine
+ * the lowest common set of input data, and only this data will be merged.
+ * This may have some unintended consequences:
+ * e.g. if majority of input segments are fetched and parsed, but one of them 
is unfetched,
+ * the tool will fall back to just merging fetchlists, and it will skip all 
other data
+ * from all segments.</p>
+ * <h4>Merging fetchlists</h4>
+ * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching)
+ * is not recommended, because this tool (unlike the [EMAIL PROTECTED] 
org.apache.nutch.crawl.Generator}
+ * doesn't ensure that fetchlist parts for each map task are disjoint.</p>
+ * <p>
+ * <h4>Duplicate content</h4>
+ * Merging segments removes older content whenever possible (see below). 
However,
+ * this is NOT the same as de-duplication, which in addition removes identical
+ * content found at different URL-s. In other words, running DeleteDuplicates 
is
+ * still necessary.
+ * </p>
+ * <p>For some types of data (especially ParseText) it's not possible to 
determine
+ * which version is really older. Therefore the tool always uses segment names 
as
+ * timestamps, for all types of input data. Segment names are compared in 
forward lexicographic
+ * order (0-9a-zA-Z), and data from segments with "higher" names will prevail.
+ * It follows then that it is extremely important that segments be named in an
+ * increasing lexicographic order as their creation time increases.</p>
+ * <p>
+ * <h4>Merging and indexes</h4>
+ * Merged segment gets a different name. Since Indexer embeds segment names in
+ * indexes, any indexes originally created for the input segments will NOT 
work with the
+ * merged segment. Newly created merged segment(s) need to be indexed afresh.
+ * This tool doesn't use existing indexes in any way, so if
+ * you plan to merge segments you don't have to index them prior to merging.
+ * 
+ * 
+ * @author Andrzej Bialecki
+ */
+public class SegmentMerger extends Configured implements Mapper, Reducer {
+  private static final Logger LOG = 
Logger.getLogger(SegmentMerger.class.getName());
+
+  private static final UTF8 SEGMENT_PART_KEY = new UTF8("_PaRt_");
+  private static final UTF8 SEGMENT_NAME_KEY = new UTF8("_NaMe_");
+  private static final String nameMarker = SEGMENT_NAME_KEY.toString();
+  private static final UTF8 SEGMENT_SLICE_KEY = new UTF8("_SlIcE_");
+  private static final String sliceMarker = SEGMENT_SLICE_KEY.toString();
+
+  private URLFilters filters = null;
+  private long sliceSize = -1;
+  private long curCount = 0;
+  
+  /**
+   * Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging 
different
+   * types in reduce.
+   */
+  public static class ObjectInputFormat extends SequenceFileInputFormat {
+    public RecordReader getRecordReader(FileSystem fs, FileSplit split, 
JobConf job, Reporter reporter)
+            throws IOException {
+
+      reporter.setStatus(split.toString());
+      // find part name
+      String dir = split.getFile().toString().replace('\\', '/');
+      int idx = dir.lastIndexOf("/part-");
+      if (idx == -1) {
+        throw new IOException("Cannot determine segment part: " + dir);
+      }
+      dir = dir.substring(0, idx);
+      idx = dir.lastIndexOf('/');
+      if (idx == -1) {
+        throw new IOException("Cannot determine segment part: " + dir);
+      }
+      final String part = dir.substring(idx + 1);
+      // find segment name
+      dir = dir.substring(0, idx);
+      idx = dir.lastIndexOf('/');
+      if (idx == -1) {
+        throw new IOException("Cannot determine segment name: " + dir);
+      }
+      final String segment = dir.substring(idx + 1);
+
+      return new SequenceFileRecordReader(job, split) {
+        public synchronized boolean next(Writable key, Writable value) throws 
IOException {
+          ObjectWritable wrapper = (ObjectWritable) value;
+          try {
+            wrapper.set(getValueClass().newInstance());
+          } catch (Exception e) {
+            throw new IOException(e.toString());
+          }
+          boolean res = super.next(key, (Writable) wrapper.get());
+          Object o = wrapper.get();
+          if (o instanceof CrawlDatum) {
+            // record which part of segment this comes from
+            ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new 
UTF8(part));
+            ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new 
UTF8(segment));
+          } else if (o instanceof Content) {
+            if (((Content)o).getMetadata() == null) {
+              ((Content)o).setMetadata(new Metadata());
+            }
+            ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), 
segment);
+          } else if (o instanceof ParseData) {
+            if (((ParseData)o).getParseMeta() == null) {
+              ((ParseData)o).setParseMeta(new Metadata());
+            }
+            ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), 
segment);
+          } else if (o instanceof ParseText) {
+            String text = ((ParseText)o).getText();
+            o = new ParseText(SEGMENT_NAME_KEY.toString() +
+                    segment + SEGMENT_NAME_KEY.toString() + text);
+            wrapper.set(o);
+          } else {
+            throw new IOException("Unknown value type: " + 
o.getClass().getName() + "(" + o + ")");
+          }
+          return res;
+        }
+      };
+    }
+  }
+
+  public static class SegmentOutputFormat extends 
org.apache.hadoop.mapred.OutputFormatBase {
+    private static final String DEFAULT_SLICE = "default";
+    
+    public RecordWriter getRecordWriter(final FileSystem fs, final JobConf 
job, final String name) throws IOException {
+      return new RecordWriter() {
+        MapFile.Writer c_out = null;
+        MapFile.Writer f_out = null;
+        MapFile.Writer pd_out = null;
+        MapFile.Writer pt_out = null;
+        SequenceFile.Writer g_out = null;
+        SequenceFile.Writer p_out = null;
+        HashMap sliceWriters = new HashMap();
+        
+        public void write(WritableComparable key, Writable value) throws 
IOException {
+          // unwrap
+          Writable o = (Writable)((ObjectWritable)value).get();
+          String slice = null;
+          if (o instanceof CrawlDatum) {
+            // check which output dir it should go into
+            UTF8 part = 
(UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY);
+            ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY);
+            ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY);
+            if (part == null)
+              throw new IOException("Null segment part, key=" + key);
+            UTF8 uSlice = 
(UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY);
+            ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY);
+            if (uSlice != null) slice = uSlice.toString();
+            String partString = part.toString();
+            if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+              g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
+              g_out.append(key, o);
+            } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {
+              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, 
CrawlDatum.class);
+              f_out.append(key, o);
+            } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {
+              p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
+              p_out.append(key, o);
+            } else {
+              throw new IOException("Cannot determine segment part: " + 
partString);
+            }
+          } else if (o instanceof Content) {
+            slice = ((Content)o).getMetadata().get(sliceMarker);
+            ((Content)o).getMetadata().remove(sliceMarker);
+            ((Content)o).getMetadata().remove(nameMarker);
+            c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
+            c_out.append(key, o);
+          } else if (o instanceof ParseData) {
+            slice = ((ParseData)o).getParseMeta().get(sliceMarker);
+            ((ParseData)o).getParseMeta().remove(sliceMarker);
+            ((ParseData)o).getParseMeta().remove(nameMarker);
+            pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
+            pd_out.append(key, o);
+          } else if (o instanceof ParseText) {
+            String text = ((ParseText)o).getText();
+            if (text != null) {
+              // get slice name, and remove it from the text
+              if (text.startsWith(sliceMarker)) {
+                int idx = text.indexOf(sliceMarker, sliceMarker.length());
+                if (idx != -1) {
+                  slice = text.substring(sliceMarker.length(), idx);
+                  text = text.substring(idx + sliceMarker.length());
+                }
+              }
+              // get segment name, and remove it from the text
+              if (text.startsWith(nameMarker)) {
+                int idx = text.indexOf(nameMarker, nameMarker.length());
+                if (idx != -1) {
+                  text = text.substring(idx + nameMarker.length());
+                }
+              }
+              o = new ParseText(text);
+            }
+            pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
+            pt_out.append(key, o);
+          }
+        }
+        
+        // lazily create SequenceFile-s.
+        private SequenceFile.Writer ensureSequenceFile(String slice, String 
dirName) throws IOException {
+          if (slice == null) slice = DEFAULT_SLICE;
+          SequenceFile.Writer res = 
(SequenceFile.Writer)sliceWriters.get(slice + dirName);
+          if (res != null) return res;
+          String wname;
+          if (slice == DEFAULT_SLICE) {
+            wname = new File(new File(job.getOutputDir(), dirName), 
name).toString();
+          } else {
+            wname = new File(new File(new File(job.getOutputDir(), slice), 
dirName), name).toString();
+          }
+          res = new SequenceFile.Writer(fs, wname, UTF8.class, 
CrawlDatum.class);
+          sliceWriters.put(slice + dirName, res);
+          return res;
+        }
+
+        // lazily create MapFile-s.
+        private MapFile.Writer ensureMapFile(String slice, String dirName, 
Class clazz) throws IOException {
+          if (slice == null) slice = DEFAULT_SLICE;
+          MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + 
dirName);
+          if (res != null) return res;
+          String wname;
+          if (slice == DEFAULT_SLICE) {
+            wname = new File(new File(job.getOutputDir(), dirName), 
name).toString();
+          } else {
+            wname = new File(new File(new File(job.getOutputDir(), slice), 
dirName), name).toString();
+          }
+          res = new MapFile.Writer(fs, wname, UTF8.class, clazz);
+          sliceWriters.put(slice + dirName, res);
+          return res;
+        }
+
+        public void close(Reporter reporter) throws IOException {
+          Iterator it = sliceWriters.values().iterator();
+          while (it.hasNext()) {
+            Object o = it.next();
+            if (o instanceof SequenceFile.Writer) {
+              ((SequenceFile.Writer)o).close();
+            } else {
+              ((MapFile.Writer)o).close();
+            }
+          }
+        }
+      };
+    }
+  }
+
+  public SegmentMerger() {
+    super(null);
+  }
+  
+  public SegmentMerger(Configuration conf) {
+    super(conf);
+  }
+  
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null) return;
+    if (conf.getBoolean("segment.merger.filter", false))
+      filters = new URLFilters(conf);
+    sliceSize = conf.getLong("segment.merger.slice", -1);
+    if (sliceSize > 0)
+      LOG.info("Slice size: " + sliceSize + " URLs.");
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void configure(JobConf conf) {
+    setConf(conf);
+    if (sliceSize > 0) {
+      sliceSize = sliceSize / conf.getNumReduceTasks();
+    }
+  }
+  
+  public void map(WritableComparable key, Writable value, OutputCollector 
output, Reporter reporter) throws IOException {
+    if (filters != null) {
+      try {
+        if (filters.filter(((UTF8)key).toString()) == null) {
+          return;
+        }
+      } catch (Exception e) {
+        LOG.warning("Cannot filter key " + key + ": " + e.getMessage());
+      }
+    }
+    output.collect(key, value);
+  }
+
+  /**
+   * NOTE: in selecting the latest version we rely exclusively on the segment
+   * name (not all segment data contain time information). Therefore it is 
extremely
+   * important that segments be named in an increasing lexicographic order as
+   * their creation time increases.
+   */
+  public void reduce(WritableComparable key, Iterator values, OutputCollector 
output, Reporter reporter) throws IOException {
+    CrawlDatum lastG = null;
+    CrawlDatum lastF = null;
+    CrawlDatum lastSig = null;
+    Content lastC = null;
+    ParseData lastPD = null;
+    ParseText lastPT = null;
+    String lastGname = null;
+    String lastFname = null;
+    String lastSigname = null;
+    String lastCname = null;
+    String lastPDname = null;
+    String lastPTname = null;
+    TreeMap linked = new TreeMap();
+    while (values.hasNext()) {
+      ObjectWritable wrapper = (ObjectWritable)values.next();
+      Object o = wrapper.get();
+      if (o instanceof CrawlDatum) {
+        CrawlDatum val = (CrawlDatum)o;
+        // check which output dir it belongs to
+        UTF8 part = (UTF8)val.getMetaData().get(SEGMENT_PART_KEY);
+        if (part == null)
+          throw new IOException("Null segment part, key=" + key);
+        UTF8 uName = (UTF8)val.getMetaData().get(SEGMENT_NAME_KEY);
+        if (uName == null)
+          throw new IOException("Null segment name, key=" + key);
+        String name = uName.toString();
+        String partString = part.toString();
+        if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+          if (lastG == null) {
+            lastG = val;
+            lastGname = name;
+          } else {
+            // take newer
+            if (lastGname.compareTo(name) < 0) {
+              lastG = val;
+              lastGname = name;
+            }
+          }
+        } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {
+          if (lastF == null) {
+            lastF = val;
+            lastFname = name;
+          } else {
+            // take newer
+            if (lastFname.compareTo(name) < 0) {
+              lastF = val;
+              lastFname = name;
+            }
+          }
+        } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {
+          if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
+            if (lastSig == null) {
+              lastSig = val;
+              lastSigname = name;
+            } else {
+              // take newer
+              if (lastSigname.compareTo(name) < 0) {
+                lastSig = val;
+                lastSigname = name;
+              }
+            }
+            continue;
+          }
+          // collect all LINKED values from the latest segment
+          ArrayList segLinked = (ArrayList)linked.get(name);
+          if (segLinked == null) {
+            segLinked = new ArrayList();
+            linked.put(name, segLinked);
+          }
+          segLinked.add(val);
+        } else {
+          throw new IOException("Cannot determine segment part: " + 
partString);
+        }
+      } else if (o instanceof Content) {
+        String name = 
((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString());
+        if (lastC == null) {
+          lastC = (Content)o;
+          lastCname = name;
+        } else {
+          if (lastCname.compareTo(name) < 0) {
+            lastC = (Content)o;
+            lastCname = name;
+          }
+        }
+      } else if (o instanceof ParseData) {
+        String name = 
((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString());
+        if (lastPD == null) {
+          lastPD = (ParseData)o;
+          lastPDname = name;
+        } else {
+          if (lastPDname.compareTo(name) < 0) {
+            lastPD = (ParseData)o;
+            lastPDname = name;
+          }
+        }
+      } else if (o instanceof ParseText) {
+        String text = ((ParseText)o).getText();
+        String name = null;
+        int idx = text.indexOf(nameMarker, nameMarker.length());
+        if (idx != -1) {
+          name = text.substring(nameMarker.length(), idx);
+        } else {
+          throw new IOException("Missing segment name marker in ParseText, key 
" + key + ": " + text);
+        }
+        if (lastPT == null) {
+          lastPT = (ParseText)o;
+          lastPTname = name;
+        } else {
+          if (lastPTname.compareTo(name) < 0) {
+            lastPT = (ParseText)o;
+            lastPTname = name;
+          }
+        }
+      }
+    }
+    curCount++;
+    UTF8 sliceName = null;
+    ObjectWritable wrapper = new ObjectWritable();
+    if (sliceSize > 0) {
+      sliceName = new UTF8(String.valueOf(curCount / sliceSize));
+    }
+    // now output the latest values
+    if (lastG != null) {
+      if (sliceName != null) {
+        lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
+      }
+      wrapper.set(lastG);
+      output.collect(key, wrapper);
+    }
+    if (lastF != null) {
+      if (sliceName != null) {
+        lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
+      }
+      wrapper.set(lastF);
+      output.collect(key, wrapper);
+    }
+    if (lastSig != null) {
+      if (sliceName != null) {
+        lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
+      }
+      wrapper.set(lastSig);
+      output.collect(key, wrapper);
+    }
+    if (lastC != null) {
+      if (sliceName != null) {
+        lastC.getMetadata().set(sliceMarker, sliceName.toString());
+      }
+      wrapper.set(lastC);
+      output.collect(key, wrapper);
+    }
+    if (lastPD != null) {
+      if (sliceName != null) {
+        lastPD.getParseMeta().set(sliceMarker, sliceName.toString());
+      }
+      wrapper.set(lastPD);
+      output.collect(key, wrapper);
+    }
+    if (lastPT != null) {
+      if (sliceName != null) {
+        lastPT = new ParseText(sliceMarker + sliceName + sliceMarker
+                + lastPT.getText());
+      }
+      wrapper.set(lastPT);
+      output.collect(key, wrapper);
+    }
+    if (linked.size() > 0) {
+      String name = (String)linked.lastKey();
+      ArrayList segLinked = (ArrayList)linked.get(name);
+      for (int i = 0; i < segLinked.size(); i++) {
+        CrawlDatum link = (CrawlDatum)segLinked.get(i);
+        if (sliceName != null) {
+          link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);
+        }
+        wrapper.set(link);
+        output.collect(key, wrapper);
+      }
+    }
+  }
+
+  public void merge(File out, File[] segs, boolean filter, long slice) throws 
Exception {
+    LOG.info("Merging " + segs.length + " segments to " + out);
+    JobConf job = new JobConf(getConf());
+    job.setJobName("mergesegs " + out);
+    job.setBoolean("segment.merger.filter", filter);
+    job.setLong("segment.merger.slice", slice);
+    FileSystem fs = FileSystem.get(getConf());
+    // prepare the minimal common set of input dirs
+    boolean g = true;
+    boolean f = true;
+    boolean p = true;
+    boolean c = true;
+    boolean pd = true;
+    boolean pt = true;
+    for (int i = 0; i < segs.length; i++) {
+      if (!fs.exists(segs[i])) {
+        LOG.warning("SegmentMerger: input dir " + segs[i] + " doesn't exist, 
skipping.");
+        segs[i] = null;
+        continue;
+      }
+      LOG.info("SegmentMerger:   adding " + segs[i]);
+      File cDir = new File(segs[i], Content.DIR_NAME);
+      File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+      File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
+      File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
+      File pdDir = new File(segs[i], ParseData.DIR_NAME);
+      File ptDir = new File(segs[i], ParseText.DIR_NAME);
+      c = c && fs.exists(cDir);
+      g = g && fs.exists(gDir);
+      f = f && fs.exists(fDir);
+      p = p && fs.exists(pDir);
+      pd = pd && fs.exists(pdDir);
+      pt = pt && fs.exists(ptDir);
+    }
+    StringBuffer sb = new StringBuffer();
+    if (c) sb.append(" " + Content.DIR_NAME);
+    if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
+    if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
+    if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
+    if (pd) sb.append(" " + ParseData.DIR_NAME);
+    if (pt) sb.append(" " + ParseText.DIR_NAME);
+    LOG.info("SegmentMerger: using segment data from:" + sb.toString());
+    for (int i = 0; i < segs.length; i++) {
+      if (segs[i] == null) continue;
+      if (g) {
+        File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+        job.addInputDir(gDir);
+      }
+      if (c) {
+        File cDir = new File(segs[i], Content.DIR_NAME);
+        job.addInputDir(cDir);
+      }
+      if (f) {
+        File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
+        job.addInputDir(fDir);
+      }
+      if (p) {
+        File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
+        job.addInputDir(pDir);
+      }
+      if (pd) {
+        File pdDir = new File(segs[i], ParseData.DIR_NAME);
+        job.addInputDir(pdDir);
+      }
+      if (pt) {
+        File ptDir = new File(segs[i], ParseText.DIR_NAME);
+        job.addInputDir(ptDir);
+      }
+    }
+    job.setInputFormat(ObjectInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(ObjectWritable.class);
+    job.setMapperClass(SegmentMerger.class);
+    job.setReducerClass(SegmentMerger.class);
+    job.setOutputDir(out);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(ObjectWritable.class);
+    job.setOutputFormat(SegmentOutputFormat.class);
+    
+    setConf(job);
+    
+    JobClient.runJob(job);
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 
...) [-filter] [-slice NNNN]");
+      System.err.println("\toutput_dir\tname of the resulting segment, or the 
parent dir of segment slices");
+      System.err.println("\t-dir segments\tparent dir containing several 
segments");
+      System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
+      System.err.println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
+      System.err.println("\t-slice NNNN\tcreate many output segments, each 
containing NNNN URLs");
+      return;
+    }
+    Configuration conf = NutchConfiguration.create();
+    final FileSystem fs = FileSystem.get(conf);
+    File out = new File(args[0]);
+    ArrayList segs = new ArrayList();
+    long sliceSize = 0;
+    boolean filter = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-dir")) {
+        File[] files = fs.listFiles(new File(args[++i]), new FileFilter() {
+          public boolean accept(File f) {
+            try {
+              if (fs.isDirectory(f)) return true;
+            } catch (IOException e) {}
+            ;
+            return false;
+          }
+        });
+        for (int j = 0; j < files.length; j++)
+          segs.add(files[j]);
+      } else if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-slice")) {
+        sliceSize = Long.parseLong(args[++i]);
+      } else {
+        segs.add(new File(args[i]));
+      }
+    }
+    if (segs.size() == 0) {
+      System.err.println("ERROR: No input segments.");
+      return;
+    }
+    SegmentMerger merger = new SegmentMerger(conf);
+    merger.merge(out, (File[]) segs.toArray(new File[segs.size()]), filter, 
sliceSize);
+  }
+
+}

Propchange: 
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java?rev=405183&view=auto
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java 
(added)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java 
Mon May  8 14:58:18 2006
@@ -0,0 +1,138 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.nutch.util.NutchConfiguration;
+
+import junit.framework.TestCase;
+
+public class TestCrawlDbMerger extends TestCase {
+  private static final Logger LOG = 
Logger.getLogger(CrawlDbMerger.class.getName());
+  
+  String url10 = "http://example.com/";;
+  String url11 = "http://example.com/foo";;
+  String url20 = "http://example.com/";;
+  String url21 = "http://example.com/bar";;
+  String[] urls_expected = new String[] {
+          url10,
+          url11,
+          url21
+  };
+  
+  TreeSet init1 = new TreeSet();
+  TreeSet init2 = new TreeSet();
+  HashMap expected = new HashMap();
+  CrawlDatum cd1, cd2, cd3;
+  Configuration conf;
+  FileSystem fs;
+  File testDir;
+  CrawlDbReader reader;
+  
+  public void setUp() throws Exception {
+    init1.add(url10);
+    init1.add(url11);
+    init2.add(url20);
+    init2.add(url21);
+    long time = System.currentTimeMillis();
+    cd1 = new CrawlDatum();
+    cd1.setFetchInterval(1.0f);
+    cd1.setFetchTime(time);
+    cd1.getMetaData().put(new UTF8("name"), new UTF8("cd1"));
+    cd1.getMetaData().put(new UTF8("cd1"), new UTF8("cd1"));
+    cd2 = new CrawlDatum();
+    cd2.setFetchInterval(1.0f);
+    cd2.setFetchTime(time + 10000);
+    cd2.getMetaData().put(new UTF8("name"), new UTF8("cd2"));
+    cd3 = new CrawlDatum();
+    cd3.setFetchInterval(1.0f);
+    cd3.setFetchTime(time + 10000);
+    cd3.getMetaData().putAll(cd1.getMetaData());
+    cd3.getMetaData().putAll(cd2.getMetaData());
+    expected.put(url10, cd3);
+    expected.put(url11, cd1);
+    expected.put(url21, cd2);
+    conf = NutchConfiguration.create();
+    fs = FileSystem.get(conf);
+    testDir = new File("test-crawldb-" +
+            new java.util.Random().nextInt());
+    fs.mkdirs(testDir);
+  }
+  
+  public void tearDown() {
+    try {
+      if (fs.exists(testDir))
+        fs.delete(testDir);
+    } catch (Exception e) { }
+    try {
+      reader.close();
+    } catch (Exception e) { }
+  }
+
+  public void testMerge() throws Exception {
+    File crawldb1 = new File(testDir, "crawldb1");
+    File crawldb2 = new File(testDir, "crawldb2");
+    File output = new File(testDir, "output");
+    createCrawlDb(fs, crawldb1, init1, cd1);
+    createCrawlDb(fs, crawldb2, init2, cd2);
+    CrawlDbMerger merger = new CrawlDbMerger(conf);
+    LOG.fine("* merging crawldbs to " + output);
+    merger.merge(output, new File[]{crawldb1, crawldb2}, false);
+    LOG.fine("* reading crawldb: " + output);
+    reader = new CrawlDbReader();
+    String crawlDb = output.toString();
+    Iterator it = expected.keySet().iterator();
+    while (it.hasNext()) {
+      String url = (String)it.next();
+      LOG.fine("url=" + url);
+      CrawlDatum cd = (CrawlDatum)expected.get(url);
+      CrawlDatum res = reader.get(crawlDb, url, conf);
+      LOG.fine(" -> " + res);
+      System.out.println("url=" + url);
+      System.out.println(" cd " + cd);
+      System.out.println(" res " + res);
+      // may not be null
+      assertNotNull(res);
+      assertTrue(cd.equals(res));
+    }
+    reader.close();
+    fs.delete(testDir);
+  }
+  
+  private void createCrawlDb(FileSystem fs, File crawldb, TreeSet init, 
CrawlDatum cd) throws Exception {
+    LOG.fine("* creating crawldb: " + crawldb);
+    File dir = new File(crawldb, CrawlDatum.DB_DIR_NAME);
+    MapFile.Writer writer = new MapFile.Writer(fs, new File(dir, 
"part-00000").toString(), UTF8.class, CrawlDatum.class);
+    Iterator it = init.iterator();
+    while (it.hasNext()) {
+      String key = (String)it.next();
+      writer.append(new UTF8(key), cd);
+    }
+    writer.close();
+  }
+}

Propchange: 
lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java?rev=405183&view=auto
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java 
(added)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java 
Mon May  8 14:58:18 2006
@@ -0,0 +1,160 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.nutch.util.NutchConfiguration;
+
+import junit.framework.TestCase;
+
+public class TestLinkDbMerger extends TestCase {
+  private static final Logger LOG = 
Logger.getLogger(TestLinkDbMerger.class.getName());
+  
+  String url10 = "http://example.com/foo";;
+  String[] urls10 = new String[] {
+          "http://example.com/100";,
+          "http://example.com/101";
+        };
+
+  String url11 = "http://example.com/";;
+  String[] urls11 = new String[] {
+          "http://example.com/110";,
+          "http://example.com/111";
+        };
+  
+  String url20 = "http://example.com/";;
+  String[] urls20 = new String[] {
+          "http://foo.com/200";,
+          "http://foo.com/201";
+        };
+  String url21 = "http://example.com/bar";;
+  String[] urls21 = new String[] {
+          "http://foo.com/210";,
+          "http://foo.com/211";
+        };
+  
+  String[] urls10_expected = urls10;
+  String[] urls11_expected = new String[] {
+          urls11[0],
+          urls11[1],
+          urls20[0],
+          urls20[1]
+  };
+  String[] urls20_expected = urls11_expected;
+  String[] urls21_expected = urls21;
+  
+  TreeMap init1 = new TreeMap();
+  TreeMap init2 = new TreeMap();
+  HashMap expected = new HashMap();
+  Configuration conf;
+  File testDir;
+  FileSystem fs;
+  LinkDbReader reader;
+  
+  public void setUp() throws Exception {
+    init1.put(url10, urls10);
+    init1.put(url11, urls11);
+    init2.put(url20, urls20);
+    init2.put(url21, urls21);
+    expected.put(url10, urls10_expected);
+    expected.put(url11, urls11_expected);
+    expected.put(url20, urls20_expected);
+    expected.put(url21, urls21_expected);
+    conf = NutchConfiguration.create();
+    fs = FileSystem.get(conf);
+    testDir = new File("test-crawldb-" +
+            new java.util.Random().nextInt());
+    fs.mkdirs(testDir);
+  }
+  
+  public void tearDown() {
+    try {
+      if (fs.exists(testDir))
+        fs.delete(testDir);
+    } catch (Exception e) { }
+    try {
+      reader.close();
+    } catch (Exception e) { }
+  }
+
+  public void testMerge() throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    FileSystem fs = FileSystem.get(conf);
+    File testDir = new File("test-linkdb-" +
+            new java.util.Random().nextInt());
+    fs.mkdirs(testDir);
+    File linkdb1 = new File(testDir, "linkdb1");
+    File linkdb2 = new File(testDir, "linkdb2");
+    File output = new File(testDir, "output");
+    createLinkDb(fs, linkdb1, init1);
+    createLinkDb(fs, linkdb2, init2);
+    LinkDbMerger merger = new LinkDbMerger(conf);
+    LOG.fine("* merging linkdbs to " + output);
+    merger.merge(output, new File[]{linkdb1, linkdb2}, false);
+    LOG.fine("* reading linkdb: " + output);
+    reader = new LinkDbReader(fs, output, conf);
+    Iterator it = expected.keySet().iterator();
+    while (it.hasNext()) {
+      String url = (String)it.next();
+      LOG.fine("url=" + url);
+      String[] vals = (String[])expected.get(url);
+      Inlinks inlinks = reader.getInlinks(new UTF8(url));
+      // may not be null
+      assertNotNull(inlinks);
+      ArrayList links = new ArrayList();
+      Iterator it2 = inlinks.iterator();
+      while (it2.hasNext()) {
+        Inlink in = (Inlink)it2.next();
+        links.add(in.getFromUrl());
+      }
+      for (int i = 0; i < vals.length; i++) {
+        LOG.fine(" -> " + vals[i]);
+        assertTrue(links.contains(vals[i]));
+      }
+    }
+    reader.close();
+    fs.delete(testDir);
+  }
+  
+  private void createLinkDb(FileSystem fs, File linkdb, TreeMap init) throws 
Exception {
+    LOG.fine("* creating linkdb: " + linkdb);
+    File dir = new File(linkdb, LinkDb.CURRENT_NAME);
+    MapFile.Writer writer = new MapFile.Writer(fs, new File(dir, 
"part-00000").toString(), UTF8.class, Inlinks.class);
+    Iterator it = init.keySet().iterator();
+    while (it.hasNext()) {
+      String key = (String)it.next();
+      Inlinks inlinks = new Inlinks();
+      String[] vals = (String[])init.get(key);
+      for (int i = 0; i < vals.length; i++) {
+        Inlink in = new Inlink(vals[i], vals[i]);
+        inlinks.add(in);
+      }
+      writer.append(new UTF8(key), inlinks);
+    }
+    writer.close();
+  }
+}

Propchange: 
lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native




-------------------------------------------------------
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Nutch-cvs mailing list
Nutch-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to