Author: ab Date: Mon May 8 14:58:18 2006 New Revision: 405183 URL: http://svn.apache.org/viewcvs?rev=405183&view=rev Log: Add the following tools (see also NUTCH-264):
* CrawlDbMerger: merges one or more crawldb-s, with optional filtering * LinkDbMerger: merges one or more linkdb-s, with optional filtering * SegmentMerger: merges one or more segments, with optional filtering and slicing Development of these tools has been sponsored by houxou.com - thank you! Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java (with props) lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java (with props) lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (with props) lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java (with props) lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java (with props) Modified: lucene/nutch/trunk/bin/nutch lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Modified: lucene/nutch/trunk/bin/nutch URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/bin/nutch?rev=405183&r1=405182&r2=405183&view=diff ============================================================================== --- lucene/nutch/trunk/bin/nutch (original) +++ lucene/nutch/trunk/bin/nutch Mon May 8 14:58:18 2006 @@ -30,14 +30,17 @@ echo "where COMMAND is one of:" echo " crawl one-step crawler for intranets" echo " readdb read / dump crawl db" + echo " mergedb merge crawldb-s, with optional filtering" echo " readlinkdb read / dump link db" echo " inject inject new urls into the database" echo " generate generate new segments to fetch" echo " fetch fetch a segment's pages" echo " parse parse a segment's pages" echo " segread read / dump segment data" + echo " mergesegs merge several segments, with optional filtering and slicing" echo " updatedb update crawl db from segments after fetching" echo " invertlinks create a linkdb from parsed segments" + echo " mergelinkdb merge linkdb-s, with optional filtering" echo " index run the indexer on parsed segments and linkdb" echo " merge merge several segment indexes" echo " dedup remove duplicates from a set of segment indexes" @@ -131,14 +134,20 @@ CLASS=org.apache.nutch.parse.ParseSegment elif [ "$COMMAND" = "readdb" ] ; then CLASS=org.apache.nutch.crawl.CrawlDbReader +elif [ "$COMMAND" = "mergedb" ] ; then + CLASS=org.apache.nutch.crawl.CrawlDbMerger elif [ "$COMMAND" = "readlinkdb" ] ; then CLASS=org.apache.nutch.crawl.LinkDbReader elif [ "$COMMAND" = "segread" ] ; then CLASS=org.apache.nutch.segment.SegmentReader +elif [ "$COMMAND" = "mergesegs" ] ; then + CLASS=org.apache.nutch.segment.SegmentMerger elif [ "$COMMAND" = "updatedb" ] ; then CLASS=org.apache.nutch.crawl.CrawlDb elif [ "$COMMAND" = "invertlinks" ] ; then CLASS=org.apache.nutch.crawl.LinkDb +elif [ "$COMMAND" = "mergelinkdb" ] ; then + CLASS=org.apache.nutch.crawl.LinkDbMerger elif [ "$COMMAND" = "index" ] ; then CLASS=org.apache.nutch.indexer.Indexer elif [ "$COMMAND" = "dedup" ] ; then Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java?rev=405183&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java Mon May 8 14:58:18 2006 @@ -0,0 +1,163 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.*; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** + * This tool merges several CrawlDb-s into one, optionally filtering + * URLs through the current URLFilters, to skip prohibited + * pages. + * + * <p>It's possible to use this tool just for filtering - in that case + * only one CrawlDb should be specified in arguments.</p> + * <p>If more than one CrawlDb contains information about the same URL, + * only the most recent version is retained, as determined by the + * value of [EMAIL PROTECTED] org.apache.nutch.crawl.CrawlDatum#getFetchTime()}. + * However, all metadata information from all versions is accumulated, + * with newer values taking precedence over older values. + * + * @author Andrzej Bialecki + */ +public class CrawlDbMerger extends Configured { + private static final Logger LOG = Logger.getLogger(CrawlDbMerger.class.getName()); + + public static class Merger extends MapReduceBase implements Reducer { + private URLFilters filters = null; + MapWritable meta = new MapWritable(); + + public void close() throws IOException {} + + public void configure(JobConf conf) { + if (conf.getBoolean("crawldb.merger.urlfilters", false)) + filters = new URLFilters(conf); + } + + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) + throws IOException { + if (filters != null) { + try { + if (filters.filter(((UTF8) key).toString()) == null) + return; + } catch (Exception e) { + LOG.fine("Can't filter " + key + ": " + e); + } + } + CrawlDatum res = null; + long resTime = 0L; + meta.clear(); + while (values.hasNext()) { + CrawlDatum val = (CrawlDatum) values.next(); + if (res == null) { + res = val; + resTime = res.getFetchTime() - Math.round(res.getFetchInterval() * 3600 * 24 * 1000); + meta.putAll(res.getMetaData()); + continue; + } + // compute last fetch time, and pick the latest + long valTime = val.getFetchTime() - Math.round(val.getFetchInterval() * 3600 * 24 * 1000); + if (valTime > resTime) { + // collect all metadata, newer values override older values + meta.putAll(val.getMetaData()); + res = val; + resTime = res.getFetchTime() - Math.round(res.getFetchInterval() * 3600 * 24 * 1000); + } else { + // insert older metadata before newer + val.getMetaData().putAll(meta); + meta = val.getMetaData(); + } + } + res.setMetaData(meta); + output.collect(key, res); + } + } + + public CrawlDbMerger(Configuration conf) { + super(conf); + } + + public void merge(File output, File[] dbs, boolean filter) throws Exception { + JobConf job = createMergeJob(getConf(), output); + job.setBoolean("crawldb.merger.urlfilters", filter); + for (int i = 0; i < dbs.length; i++) { + job.addInputDir(new File(dbs[i], CrawlDatum.DB_DIR_NAME)); + } + JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + fs.mkdirs(output); + fs.rename(job.getOutputDir(), new File(output, CrawlDatum.DB_DIR_NAME)); + } + + public static JobConf createMergeJob(Configuration conf, File output) { + File newCrawlDb = new File("crawldb-merge-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(conf); + job.setJobName("crawldb merge " + output); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(CrawlDatum.class); + + job.setReducerClass(Merger.class); + + job.setOutputDir(newCrawlDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(CrawlDatum.class); + + return job; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("CrawlDbMerger output_crawldb crawldb1 [crawldb2 crawldb3 ...] [-filter]"); + System.err.println("\toutput_crawldb\toutput CrawlDb"); + System.err.println("\tcrawldb1 ...\tinput CrawlDb-s"); + System.err.println("\t-filter\tuse URLFilters on urls in the crawldb(s)"); + return; + } + Configuration conf = NutchConfiguration.create(); + File output = new File(args[0]); + ArrayList dbs = new ArrayList(); + boolean filter = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-filter")) { + filter = true; + continue; + } + dbs.add(new File(args[i])); + } + CrawlDbMerger merger = new CrawlDbMerger(conf); + merger.merge(output, (File[]) dbs.toArray(new File[dbs.size()]), filter); + } +} Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java ------------------------------------------------------------------------------ svn:eol-style = native Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java?rev=405183&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java Mon May 8 14:58:18 2006 @@ -0,0 +1,89 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.crawl; + +import java.io.File; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.util.NutchConfiguration; + +/** + * This tool merges several LinkDb-s into one, optionally filtering + * URLs through the current URLFilters, to skip prohibited URLs and + * links. + * + * <p>It's possible to use this tool just for filtering - in that case + * only one LinkDb should be specified in arguments.</p> + * <p>If more than one LinkDb contains information about the same URL, + * all inlinks are accumulated, but only at most <code>db.max.inlinks</code> + * inlinks will ever be added.</p> + * <p>If activated, URLFilters will be applied to both the target URLs and + * to any incoming link URL. If a target URL is prohibited, all + * inlinks to that target will be removed, including the target URL. If + * some of incoming links are prohibited, only they will be removed, and they + * won't count when checking the above-mentioned maximum limit. + * + * @author Andrzej Bialecki + */ +public class LinkDbMerger extends Configured { + + public LinkDbMerger(Configuration conf) { + super(conf); + } + + public void merge(File output, File[] dbs, boolean filter) throws Exception { + JobConf job = LinkDb.createMergeJob(getConf(), output); + job.setBoolean("linkdb.merger.urlfilters", filter); + for (int i = 0; i < dbs.length; i++) { + job.addInputDir(new File(dbs[i], LinkDb.CURRENT_NAME)); + } + JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + fs.mkdirs(output); + fs.rename(job.getOutputDir(), new File(output, LinkDb.CURRENT_NAME)); + } + /** + * @param args + */ + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("LinkDbMerger output_linkdb linkdb1 [linkdb2 linkdb3 ...] [-filter]"); + System.err.println("\toutput_linkdb\toutput LinkDb"); + System.err.println("\tlinkdb1 ...\tinput LinkDb-s"); + System.err.println("\t-filter\tuse URLFilters on both fromUrls and toUrls in linkdb(s)"); + return; + } + Configuration conf = NutchConfiguration.create(); + File output = new File(args[0]); + ArrayList dbs = new ArrayList(); + boolean filter = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-filter")) { + filter = true; + continue; + } + dbs.add(new File(args[i])); + } + LinkDbMerger merger = new LinkDbMerger(conf); + merger.merge(output, (File[])dbs.toArray(new File[dbs.size()]), filter); + } + +} Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDbMerger.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java?rev=405183&r1=405182&r2=405183&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseData.java Mon May 8 14:58:18 2006 @@ -87,6 +87,10 @@ */ public Metadata getParseMeta() { return parseMeta; } + public void setParseMeta(Metadata parseMeta) { + this.parseMeta = parseMeta; + } + /** * Get a metadata single value. * This method first looks for the metadata value in the parse metadata. If no Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java?rev=405183&r1=405182&r2=405183&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Mon May 8 14:58:18 2006 @@ -146,6 +146,12 @@ return metadata; } + /** Other protocol-specific data. */ + public void setMetadata(Metadata metadata) { + ensureInflated(); + this.metadata = metadata; + } + public boolean equals(Object o) { ensureInflated(); if (!(o instanceof Content)){ Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=405183&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Mon May 8 14:58:18 2006 @@ -0,0 +1,647 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.segment; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.*; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.NutchConfiguration; + +/** + * This tool takes several segments and merges their data together. Only the + * latest versions of data is retained. + * <p> + * Optionally, you can apply current URLFilters to remove prohibited URL-s. + * </p> + * <p> + * Also, it's possible to slice the resulting segment into chunks of fixed size. + * </p> + * <h3>Important Notes</h3> + * <h4>Which parts are merged?</h4> + * <p>It doesn't make sense to merge data from segments, which are at different stages + * of processing (e.g. one unfetched segment, one fetched but not parsed, and + * one fetched and parsed). Therefore, prior to merging, the tool will determine + * the lowest common set of input data, and only this data will be merged. + * This may have some unintended consequences: + * e.g. if majority of input segments are fetched and parsed, but one of them is unfetched, + * the tool will fall back to just merging fetchlists, and it will skip all other data + * from all segments.</p> + * <h4>Merging fetchlists</h4> + * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching) + * is not recommended, because this tool (unlike the [EMAIL PROTECTED] org.apache.nutch.crawl.Generator} + * doesn't ensure that fetchlist parts for each map task are disjoint.</p> + * <p> + * <h4>Duplicate content</h4> + * Merging segments removes older content whenever possible (see below). However, + * this is NOT the same as de-duplication, which in addition removes identical + * content found at different URL-s. In other words, running DeleteDuplicates is + * still necessary. + * </p> + * <p>For some types of data (especially ParseText) it's not possible to determine + * which version is really older. Therefore the tool always uses segment names as + * timestamps, for all types of input data. Segment names are compared in forward lexicographic + * order (0-9a-zA-Z), and data from segments with "higher" names will prevail. + * It follows then that it is extremely important that segments be named in an + * increasing lexicographic order as their creation time increases.</p> + * <p> + * <h4>Merging and indexes</h4> + * Merged segment gets a different name. Since Indexer embeds segment names in + * indexes, any indexes originally created for the input segments will NOT work with the + * merged segment. Newly created merged segment(s) need to be indexed afresh. + * This tool doesn't use existing indexes in any way, so if + * you plan to merge segments you don't have to index them prior to merging. + * + * + * @author Andrzej Bialecki + */ +public class SegmentMerger extends Configured implements Mapper, Reducer { + private static final Logger LOG = Logger.getLogger(SegmentMerger.class.getName()); + + private static final UTF8 SEGMENT_PART_KEY = new UTF8("_PaRt_"); + private static final UTF8 SEGMENT_NAME_KEY = new UTF8("_NaMe_"); + private static final String nameMarker = SEGMENT_NAME_KEY.toString(); + private static final UTF8 SEGMENT_SLICE_KEY = new UTF8("_SlIcE_"); + private static final String sliceMarker = SEGMENT_SLICE_KEY.toString(); + + private URLFilters filters = null; + private long sliceSize = -1; + private long curCount = 0; + + /** + * Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging different + * types in reduce. + */ + public static class ObjectInputFormat extends SequenceFileInputFormat { + public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(split.toString()); + // find part name + String dir = split.getFile().toString().replace('\\', '/'); + int idx = dir.lastIndexOf("/part-"); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + final String part = dir.substring(idx + 1); + // find segment name + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment name: " + dir); + } + final String segment = dir.substring(idx + 1); + + return new SequenceFileRecordReader(job, split) { + public synchronized boolean next(Writable key, Writable value) throws IOException { + ObjectWritable wrapper = (ObjectWritable) value; + try { + wrapper.set(getValueClass().newInstance()); + } catch (Exception e) { + throw new IOException(e.toString()); + } + boolean res = super.next(key, (Writable) wrapper.get()); + Object o = wrapper.get(); + if (o instanceof CrawlDatum) { + // record which part of segment this comes from + ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new UTF8(part)); + ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new UTF8(segment)); + } else if (o instanceof Content) { + if (((Content)o).getMetadata() == null) { + ((Content)o).setMetadata(new Metadata()); + } + ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), segment); + } else if (o instanceof ParseData) { + if (((ParseData)o).getParseMeta() == null) { + ((ParseData)o).setParseMeta(new Metadata()); + } + ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), segment); + } else if (o instanceof ParseText) { + String text = ((ParseText)o).getText(); + o = new ParseText(SEGMENT_NAME_KEY.toString() + + segment + SEGMENT_NAME_KEY.toString() + text); + wrapper.set(o); + } else { + throw new IOException("Unknown value type: " + o.getClass().getName() + "(" + o + ")"); + } + return res; + } + }; + } + } + + public static class SegmentOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase { + private static final String DEFAULT_SLICE = "default"; + + public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name) throws IOException { + return new RecordWriter() { + MapFile.Writer c_out = null; + MapFile.Writer f_out = null; + MapFile.Writer pd_out = null; + MapFile.Writer pt_out = null; + SequenceFile.Writer g_out = null; + SequenceFile.Writer p_out = null; + HashMap sliceWriters = new HashMap(); + + public void write(WritableComparable key, Writable value) throws IOException { + // unwrap + Writable o = (Writable)((ObjectWritable)value).get(); + String slice = null; + if (o instanceof CrawlDatum) { + // check which output dir it should go into + UTF8 part = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY); + ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY); + ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY); + if (part == null) + throw new IOException("Null segment part, key=" + key); + UTF8 uSlice = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY); + ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY); + if (uSlice != null) slice = uSlice.toString(); + String partString = part.toString(); + if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { + g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME); + g_out.append(key, o); + } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { + f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, CrawlDatum.class); + f_out.append(key, o); + } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { + p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME); + p_out.append(key, o); + } else { + throw new IOException("Cannot determine segment part: " + partString); + } + } else if (o instanceof Content) { + slice = ((Content)o).getMetadata().get(sliceMarker); + ((Content)o).getMetadata().remove(sliceMarker); + ((Content)o).getMetadata().remove(nameMarker); + c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); + c_out.append(key, o); + } else if (o instanceof ParseData) { + slice = ((ParseData)o).getParseMeta().get(sliceMarker); + ((ParseData)o).getParseMeta().remove(sliceMarker); + ((ParseData)o).getParseMeta().remove(nameMarker); + pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); + pd_out.append(key, o); + } else if (o instanceof ParseText) { + String text = ((ParseText)o).getText(); + if (text != null) { + // get slice name, and remove it from the text + if (text.startsWith(sliceMarker)) { + int idx = text.indexOf(sliceMarker, sliceMarker.length()); + if (idx != -1) { + slice = text.substring(sliceMarker.length(), idx); + text = text.substring(idx + sliceMarker.length()); + } + } + // get segment name, and remove it from the text + if (text.startsWith(nameMarker)) { + int idx = text.indexOf(nameMarker, nameMarker.length()); + if (idx != -1) { + text = text.substring(idx + nameMarker.length()); + } + } + o = new ParseText(text); + } + pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class); + pt_out.append(key, o); + } + } + + // lazily create SequenceFile-s. + private SequenceFile.Writer ensureSequenceFile(String slice, String dirName) throws IOException { + if (slice == null) slice = DEFAULT_SLICE; + SequenceFile.Writer res = (SequenceFile.Writer)sliceWriters.get(slice + dirName); + if (res != null) return res; + String wname; + if (slice == DEFAULT_SLICE) { + wname = new File(new File(job.getOutputDir(), dirName), name).toString(); + } else { + wname = new File(new File(new File(job.getOutputDir(), slice), dirName), name).toString(); + } + res = new SequenceFile.Writer(fs, wname, UTF8.class, CrawlDatum.class); + sliceWriters.put(slice + dirName, res); + return res; + } + + // lazily create MapFile-s. + private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException { + if (slice == null) slice = DEFAULT_SLICE; + MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName); + if (res != null) return res; + String wname; + if (slice == DEFAULT_SLICE) { + wname = new File(new File(job.getOutputDir(), dirName), name).toString(); + } else { + wname = new File(new File(new File(job.getOutputDir(), slice), dirName), name).toString(); + } + res = new MapFile.Writer(fs, wname, UTF8.class, clazz); + sliceWriters.put(slice + dirName, res); + return res; + } + + public void close(Reporter reporter) throws IOException { + Iterator it = sliceWriters.values().iterator(); + while (it.hasNext()) { + Object o = it.next(); + if (o instanceof SequenceFile.Writer) { + ((SequenceFile.Writer)o).close(); + } else { + ((MapFile.Writer)o).close(); + } + } + } + }; + } + } + + public SegmentMerger() { + super(null); + } + + public SegmentMerger(Configuration conf) { + super(conf); + } + + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) return; + if (conf.getBoolean("segment.merger.filter", false)) + filters = new URLFilters(conf); + sliceSize = conf.getLong("segment.merger.slice", -1); + if (sliceSize > 0) + LOG.info("Slice size: " + sliceSize + " URLs."); + } + + public void close() throws IOException { + } + + public void configure(JobConf conf) { + setConf(conf); + if (sliceSize > 0) { + sliceSize = sliceSize / conf.getNumReduceTasks(); + } + } + + public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { + if (filters != null) { + try { + if (filters.filter(((UTF8)key).toString()) == null) { + return; + } + } catch (Exception e) { + LOG.warning("Cannot filter key " + key + ": " + e.getMessage()); + } + } + output.collect(key, value); + } + + /** + * NOTE: in selecting the latest version we rely exclusively on the segment + * name (not all segment data contain time information). Therefore it is extremely + * important that segments be named in an increasing lexicographic order as + * their creation time increases. + */ + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + CrawlDatum lastG = null; + CrawlDatum lastF = null; + CrawlDatum lastSig = null; + Content lastC = null; + ParseData lastPD = null; + ParseText lastPT = null; + String lastGname = null; + String lastFname = null; + String lastSigname = null; + String lastCname = null; + String lastPDname = null; + String lastPTname = null; + TreeMap linked = new TreeMap(); + while (values.hasNext()) { + ObjectWritable wrapper = (ObjectWritable)values.next(); + Object o = wrapper.get(); + if (o instanceof CrawlDatum) { + CrawlDatum val = (CrawlDatum)o; + // check which output dir it belongs to + UTF8 part = (UTF8)val.getMetaData().get(SEGMENT_PART_KEY); + if (part == null) + throw new IOException("Null segment part, key=" + key); + UTF8 uName = (UTF8)val.getMetaData().get(SEGMENT_NAME_KEY); + if (uName == null) + throw new IOException("Null segment name, key=" + key); + String name = uName.toString(); + String partString = part.toString(); + if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { + if (lastG == null) { + lastG = val; + lastGname = name; + } else { + // take newer + if (lastGname.compareTo(name) < 0) { + lastG = val; + lastGname = name; + } + } + } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { + if (lastF == null) { + lastF = val; + lastFname = name; + } else { + // take newer + if (lastFname.compareTo(name) < 0) { + lastF = val; + lastFname = name; + } + } + } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { + if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) { + if (lastSig == null) { + lastSig = val; + lastSigname = name; + } else { + // take newer + if (lastSigname.compareTo(name) < 0) { + lastSig = val; + lastSigname = name; + } + } + continue; + } + // collect all LINKED values from the latest segment + ArrayList segLinked = (ArrayList)linked.get(name); + if (segLinked == null) { + segLinked = new ArrayList(); + linked.put(name, segLinked); + } + segLinked.add(val); + } else { + throw new IOException("Cannot determine segment part: " + partString); + } + } else if (o instanceof Content) { + String name = ((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString()); + if (lastC == null) { + lastC = (Content)o; + lastCname = name; + } else { + if (lastCname.compareTo(name) < 0) { + lastC = (Content)o; + lastCname = name; + } + } + } else if (o instanceof ParseData) { + String name = ((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString()); + if (lastPD == null) { + lastPD = (ParseData)o; + lastPDname = name; + } else { + if (lastPDname.compareTo(name) < 0) { + lastPD = (ParseData)o; + lastPDname = name; + } + } + } else if (o instanceof ParseText) { + String text = ((ParseText)o).getText(); + String name = null; + int idx = text.indexOf(nameMarker, nameMarker.length()); + if (idx != -1) { + name = text.substring(nameMarker.length(), idx); + } else { + throw new IOException("Missing segment name marker in ParseText, key " + key + ": " + text); + } + if (lastPT == null) { + lastPT = (ParseText)o; + lastPTname = name; + } else { + if (lastPTname.compareTo(name) < 0) { + lastPT = (ParseText)o; + lastPTname = name; + } + } + } + } + curCount++; + UTF8 sliceName = null; + ObjectWritable wrapper = new ObjectWritable(); + if (sliceSize > 0) { + sliceName = new UTF8(String.valueOf(curCount / sliceSize)); + } + // now output the latest values + if (lastG != null) { + if (sliceName != null) { + lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); + } + wrapper.set(lastG); + output.collect(key, wrapper); + } + if (lastF != null) { + if (sliceName != null) { + lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); + } + wrapper.set(lastF); + output.collect(key, wrapper); + } + if (lastSig != null) { + if (sliceName != null) { + lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); + } + wrapper.set(lastSig); + output.collect(key, wrapper); + } + if (lastC != null) { + if (sliceName != null) { + lastC.getMetadata().set(sliceMarker, sliceName.toString()); + } + wrapper.set(lastC); + output.collect(key, wrapper); + } + if (lastPD != null) { + if (sliceName != null) { + lastPD.getParseMeta().set(sliceMarker, sliceName.toString()); + } + wrapper.set(lastPD); + output.collect(key, wrapper); + } + if (lastPT != null) { + if (sliceName != null) { + lastPT = new ParseText(sliceMarker + sliceName + sliceMarker + + lastPT.getText()); + } + wrapper.set(lastPT); + output.collect(key, wrapper); + } + if (linked.size() > 0) { + String name = (String)linked.lastKey(); + ArrayList segLinked = (ArrayList)linked.get(name); + for (int i = 0; i < segLinked.size(); i++) { + CrawlDatum link = (CrawlDatum)segLinked.get(i); + if (sliceName != null) { + link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); + } + wrapper.set(link); + output.collect(key, wrapper); + } + } + } + + public void merge(File out, File[] segs, boolean filter, long slice) throws Exception { + LOG.info("Merging " + segs.length + " segments to " + out); + JobConf job = new JobConf(getConf()); + job.setJobName("mergesegs " + out); + job.setBoolean("segment.merger.filter", filter); + job.setLong("segment.merger.slice", slice); + FileSystem fs = FileSystem.get(getConf()); + // prepare the minimal common set of input dirs + boolean g = true; + boolean f = true; + boolean p = true; + boolean c = true; + boolean pd = true; + boolean pt = true; + for (int i = 0; i < segs.length; i++) { + if (!fs.exists(segs[i])) { + LOG.warning("SegmentMerger: input dir " + segs[i] + " doesn't exist, skipping."); + segs[i] = null; + continue; + } + LOG.info("SegmentMerger: adding " + segs[i]); + File cDir = new File(segs[i], Content.DIR_NAME); + File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME); + File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME); + File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME); + File pdDir = new File(segs[i], ParseData.DIR_NAME); + File ptDir = new File(segs[i], ParseText.DIR_NAME); + c = c && fs.exists(cDir); + g = g && fs.exists(gDir); + f = f && fs.exists(fDir); + p = p && fs.exists(pDir); + pd = pd && fs.exists(pdDir); + pt = pt && fs.exists(ptDir); + } + StringBuffer sb = new StringBuffer(); + if (c) sb.append(" " + Content.DIR_NAME); + if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME); + if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME); + if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME); + if (pd) sb.append(" " + ParseData.DIR_NAME); + if (pt) sb.append(" " + ParseText.DIR_NAME); + LOG.info("SegmentMerger: using segment data from:" + sb.toString()); + for (int i = 0; i < segs.length; i++) { + if (segs[i] == null) continue; + if (g) { + File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME); + job.addInputDir(gDir); + } + if (c) { + File cDir = new File(segs[i], Content.DIR_NAME); + job.addInputDir(cDir); + } + if (f) { + File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME); + job.addInputDir(fDir); + } + if (p) { + File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME); + job.addInputDir(pDir); + } + if (pd) { + File pdDir = new File(segs[i], ParseData.DIR_NAME); + job.addInputDir(pdDir); + } + if (pt) { + File ptDir = new File(segs[i], ParseText.DIR_NAME); + job.addInputDir(ptDir); + } + } + job.setInputFormat(ObjectInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(ObjectWritable.class); + job.setMapperClass(SegmentMerger.class); + job.setReducerClass(SegmentMerger.class); + job.setOutputDir(out); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(ObjectWritable.class); + job.setOutputFormat(SegmentOutputFormat.class); + + setConf(job); + + JobClient.runJob(job); + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); + System.err.println("\toutput_dir\tname of the resulting segment, or the parent dir of segment slices"); + System.err.println("\t-dir segments\tparent dir containing several segments"); + System.err.println("\tseg1 seg2 ...\tlist of segment dirs"); + System.err.println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); + System.err.println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs"); + return; + } + Configuration conf = NutchConfiguration.create(); + final FileSystem fs = FileSystem.get(conf); + File out = new File(args[0]); + ArrayList segs = new ArrayList(); + long sliceSize = 0; + boolean filter = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dir")) { + File[] files = fs.listFiles(new File(args[++i]), new FileFilter() { + public boolean accept(File f) { + try { + if (fs.isDirectory(f)) return true; + } catch (IOException e) {} + ; + return false; + } + }); + for (int j = 0; j < files.length; j++) + segs.add(files[j]); + } else if (args[i].equals("-filter")) { + filter = true; + } else if (args[i].equals("-slice")) { + sliceSize = Long.parseLong(args[++i]); + } else { + segs.add(new File(args[i])); + } + } + if (segs.size() == 0) { + System.err.println("ERROR: No input segments."); + return; + } + SegmentMerger merger = new SegmentMerger(conf); + merger.merge(out, (File[]) segs.toArray(new File[segs.size()]), filter, sliceSize); + } + +} Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java ------------------------------------------------------------------------------ svn:eol-style = native Added: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java?rev=405183&view=auto ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java (added) +++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java Mon May 8 14:58:18 2006 @@ -0,0 +1,138 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.UTF8; +import org.apache.nutch.util.NutchConfiguration; + +import junit.framework.TestCase; + +public class TestCrawlDbMerger extends TestCase { + private static final Logger LOG = Logger.getLogger(CrawlDbMerger.class.getName()); + + String url10 = "http://example.com/"; + String url11 = "http://example.com/foo"; + String url20 = "http://example.com/"; + String url21 = "http://example.com/bar"; + String[] urls_expected = new String[] { + url10, + url11, + url21 + }; + + TreeSet init1 = new TreeSet(); + TreeSet init2 = new TreeSet(); + HashMap expected = new HashMap(); + CrawlDatum cd1, cd2, cd3; + Configuration conf; + FileSystem fs; + File testDir; + CrawlDbReader reader; + + public void setUp() throws Exception { + init1.add(url10); + init1.add(url11); + init2.add(url20); + init2.add(url21); + long time = System.currentTimeMillis(); + cd1 = new CrawlDatum(); + cd1.setFetchInterval(1.0f); + cd1.setFetchTime(time); + cd1.getMetaData().put(new UTF8("name"), new UTF8("cd1")); + cd1.getMetaData().put(new UTF8("cd1"), new UTF8("cd1")); + cd2 = new CrawlDatum(); + cd2.setFetchInterval(1.0f); + cd2.setFetchTime(time + 10000); + cd2.getMetaData().put(new UTF8("name"), new UTF8("cd2")); + cd3 = new CrawlDatum(); + cd3.setFetchInterval(1.0f); + cd3.setFetchTime(time + 10000); + cd3.getMetaData().putAll(cd1.getMetaData()); + cd3.getMetaData().putAll(cd2.getMetaData()); + expected.put(url10, cd3); + expected.put(url11, cd1); + expected.put(url21, cd2); + conf = NutchConfiguration.create(); + fs = FileSystem.get(conf); + testDir = new File("test-crawldb-" + + new java.util.Random().nextInt()); + fs.mkdirs(testDir); + } + + public void tearDown() { + try { + if (fs.exists(testDir)) + fs.delete(testDir); + } catch (Exception e) { } + try { + reader.close(); + } catch (Exception e) { } + } + + public void testMerge() throws Exception { + File crawldb1 = new File(testDir, "crawldb1"); + File crawldb2 = new File(testDir, "crawldb2"); + File output = new File(testDir, "output"); + createCrawlDb(fs, crawldb1, init1, cd1); + createCrawlDb(fs, crawldb2, init2, cd2); + CrawlDbMerger merger = new CrawlDbMerger(conf); + LOG.fine("* merging crawldbs to " + output); + merger.merge(output, new File[]{crawldb1, crawldb2}, false); + LOG.fine("* reading crawldb: " + output); + reader = new CrawlDbReader(); + String crawlDb = output.toString(); + Iterator it = expected.keySet().iterator(); + while (it.hasNext()) { + String url = (String)it.next(); + LOG.fine("url=" + url); + CrawlDatum cd = (CrawlDatum)expected.get(url); + CrawlDatum res = reader.get(crawlDb, url, conf); + LOG.fine(" -> " + res); + System.out.println("url=" + url); + System.out.println(" cd " + cd); + System.out.println(" res " + res); + // may not be null + assertNotNull(res); + assertTrue(cd.equals(res)); + } + reader.close(); + fs.delete(testDir); + } + + private void createCrawlDb(FileSystem fs, File crawldb, TreeSet init, CrawlDatum cd) throws Exception { + LOG.fine("* creating crawldb: " + crawldb); + File dir = new File(crawldb, CrawlDatum.DB_DIR_NAME); + MapFile.Writer writer = new MapFile.Writer(fs, new File(dir, "part-00000").toString(), UTF8.class, CrawlDatum.class); + Iterator it = init.iterator(); + while (it.hasNext()) { + String key = (String)it.next(); + writer.append(new UTF8(key), cd); + } + writer.close(); + } +} Propchange: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java ------------------------------------------------------------------------------ svn:eol-style = native Added: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java?rev=405183&view=auto ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java (added) +++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java Mon May 8 14:58:18 2006 @@ -0,0 +1,160 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.TreeMap; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.UTF8; +import org.apache.nutch.util.NutchConfiguration; + +import junit.framework.TestCase; + +public class TestLinkDbMerger extends TestCase { + private static final Logger LOG = Logger.getLogger(TestLinkDbMerger.class.getName()); + + String url10 = "http://example.com/foo"; + String[] urls10 = new String[] { + "http://example.com/100", + "http://example.com/101" + }; + + String url11 = "http://example.com/"; + String[] urls11 = new String[] { + "http://example.com/110", + "http://example.com/111" + }; + + String url20 = "http://example.com/"; + String[] urls20 = new String[] { + "http://foo.com/200", + "http://foo.com/201" + }; + String url21 = "http://example.com/bar"; + String[] urls21 = new String[] { + "http://foo.com/210", + "http://foo.com/211" + }; + + String[] urls10_expected = urls10; + String[] urls11_expected = new String[] { + urls11[0], + urls11[1], + urls20[0], + urls20[1] + }; + String[] urls20_expected = urls11_expected; + String[] urls21_expected = urls21; + + TreeMap init1 = new TreeMap(); + TreeMap init2 = new TreeMap(); + HashMap expected = new HashMap(); + Configuration conf; + File testDir; + FileSystem fs; + LinkDbReader reader; + + public void setUp() throws Exception { + init1.put(url10, urls10); + init1.put(url11, urls11); + init2.put(url20, urls20); + init2.put(url21, urls21); + expected.put(url10, urls10_expected); + expected.put(url11, urls11_expected); + expected.put(url20, urls20_expected); + expected.put(url21, urls21_expected); + conf = NutchConfiguration.create(); + fs = FileSystem.get(conf); + testDir = new File("test-crawldb-" + + new java.util.Random().nextInt()); + fs.mkdirs(testDir); + } + + public void tearDown() { + try { + if (fs.exists(testDir)) + fs.delete(testDir); + } catch (Exception e) { } + try { + reader.close(); + } catch (Exception e) { } + } + + public void testMerge() throws Exception { + Configuration conf = NutchConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + File testDir = new File("test-linkdb-" + + new java.util.Random().nextInt()); + fs.mkdirs(testDir); + File linkdb1 = new File(testDir, "linkdb1"); + File linkdb2 = new File(testDir, "linkdb2"); + File output = new File(testDir, "output"); + createLinkDb(fs, linkdb1, init1); + createLinkDb(fs, linkdb2, init2); + LinkDbMerger merger = new LinkDbMerger(conf); + LOG.fine("* merging linkdbs to " + output); + merger.merge(output, new File[]{linkdb1, linkdb2}, false); + LOG.fine("* reading linkdb: " + output); + reader = new LinkDbReader(fs, output, conf); + Iterator it = expected.keySet().iterator(); + while (it.hasNext()) { + String url = (String)it.next(); + LOG.fine("url=" + url); + String[] vals = (String[])expected.get(url); + Inlinks inlinks = reader.getInlinks(new UTF8(url)); + // may not be null + assertNotNull(inlinks); + ArrayList links = new ArrayList(); + Iterator it2 = inlinks.iterator(); + while (it2.hasNext()) { + Inlink in = (Inlink)it2.next(); + links.add(in.getFromUrl()); + } + for (int i = 0; i < vals.length; i++) { + LOG.fine(" -> " + vals[i]); + assertTrue(links.contains(vals[i])); + } + } + reader.close(); + fs.delete(testDir); + } + + private void createLinkDb(FileSystem fs, File linkdb, TreeMap init) throws Exception { + LOG.fine("* creating linkdb: " + linkdb); + File dir = new File(linkdb, LinkDb.CURRENT_NAME); + MapFile.Writer writer = new MapFile.Writer(fs, new File(dir, "part-00000").toString(), UTF8.class, Inlinks.class); + Iterator it = init.keySet().iterator(); + while (it.hasNext()) { + String key = (String)it.next(); + Inlinks inlinks = new Inlinks(); + String[] vals = (String[])init.get(key); + for (int i = 0; i < vals.length; i++) { + Inlink in = new Inlink(vals[i], vals[i]); + inlinks.add(in); + } + writer.append(new UTF8(key), inlinks); + } + writer.close(); + } +} Propchange: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestLinkDbMerger.java ------------------------------------------------------------------------------ svn:eol-style = native ------------------------------------------------------- Using Tomcat but need to do more? Need to support web services, security? Get stuff done quickly with pre-integrated technology to make your job easier Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642 _______________________________________________ Nutch-cvs mailing list Nutch-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/nutch-cvs