Author: ab
Date: Tue May 30 14:12:52 2006
New Revision: 410377
URL: http://svn.apache.org/viewvc?rev=410377&view=rev
Log:
SegmentMerger bug-fixes and improvements:
* replace deprecated use of java.io.File with Hadoop's Path.
* old segment name from Content.metadata needs to be replaced with
the new segment name. This was causing NPE-s when getting hit
summaries.
* SegmentMerger will now always create its output in a subdirectory
of the output_dir argument. All newly created segments will follow
the same naming convention as other segments (i.e. yyyyMMddHHmmss),
and use sequential suffixes for sliced segments (yyyyMMddHHmmss-NN).
Rename Generator.getDate() to Generator.generateSegmentName(), and
make it public. Additionally, this method now will try to ensure that
unique segment names are created.
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=410377&r1=410376&r2=410377&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Tue May
30 14:12:52 2006
@@ -243,7 +243,7 @@
"/generate-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- Path segment = new Path(segments, getDate());
+ Path segment = new Path(segments, generateSegmentName());
Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
LOG.info("Generator: starting");
@@ -305,9 +305,14 @@
return segment;
}
+
+ private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
- private static String getDate() {
- return new SimpleDateFormat("yyyyMMddHHmmss").format
+ public static synchronized String generateSegmentName() {
+ try {
+ Thread.sleep(1000);
+ } catch (Throwable t) {};
+ return sdf.format
(new Date(System.currentTimeMillis()));
}
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=410377&r1=410376&r2=410377&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue
May 30 14:12:52 2006
@@ -16,17 +16,19 @@
package org.apache.nutch.segment;
-import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.util.*;
import java.util.logging.Logger;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.fetcher.Fetcher;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.parse.ParseData;
@@ -104,7 +106,7 @@
reporter.setStatus(split.toString());
// find part name
- String dir = split.getFile().toString().replace('\\', '/');
+ String dir = split.getPath().toString().replace('\\', '/');
int idx = dir.lastIndexOf("/part-");
if (idx == -1) {
throw new IOException("Cannot determine segment part: " + dir);
@@ -173,6 +175,7 @@
SequenceFile.Writer g_out = null;
SequenceFile.Writer p_out = null;
HashMap sliceWriters = new HashMap();
+ String segmentName = job.get("segment.merger.segmentName");
public void write(WritableComparable key, Writable value) throws
IOException {
// unwrap
@@ -205,12 +208,24 @@
slice = ((Content)o).getMetadata().get(sliceMarker);
((Content)o).getMetadata().remove(sliceMarker);
((Content)o).getMetadata().remove(nameMarker);
+ // update the segment name inside metadata
+ if (slice == null) {
+ ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY,
segmentName);
+ } else {
+ ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY,
segmentName + "-" + slice);
+ }
c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
c_out.append(key, o);
} else if (o instanceof ParseData) {
slice = ((ParseData)o).getParseMeta().get(sliceMarker);
((ParseData)o).getParseMeta().remove(sliceMarker);
((ParseData)o).getParseMeta().remove(nameMarker);
+ // update the segment name inside contentMeta - required by Indexer
+ if (slice == null) {
+ ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY,
segmentName);
+ } else {
+ ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY,
segmentName + "-" + slice);
+ }
pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
pd_out.append(key, o);
} else if (o instanceof ParseText) {
@@ -243,11 +258,11 @@
if (slice == null) slice = DEFAULT_SLICE;
SequenceFile.Writer res =
(SequenceFile.Writer)sliceWriters.get(slice + dirName);
if (res != null) return res;
- String wname;
+ Path wname;
if (slice == DEFAULT_SLICE) {
- wname = new File(new File(job.getOutputDir(), dirName),
name).toString();
+ wname = new Path(new Path(new Path(job.getOutputPath(),
segmentName), dirName), name);
} else {
- wname = new File(new File(new File(job.getOutputDir(), slice),
dirName), name).toString();
+ wname = new Path(new Path(new Path(job.getOutputPath(),
segmentName + "-" + slice), dirName), name);
}
res = new SequenceFile.Writer(fs, wname, UTF8.class,
CrawlDatum.class);
sliceWriters.put(slice + dirName, res);
@@ -259,13 +274,13 @@
if (slice == null) slice = DEFAULT_SLICE;
MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice +
dirName);
if (res != null) return res;
- String wname;
+ Path wname;
if (slice == DEFAULT_SLICE) {
- wname = new File(new File(job.getOutputDir(), dirName),
name).toString();
+ wname = new Path(new Path(new Path(job.getOutputPath(),
segmentName), dirName), name);
} else {
- wname = new File(new File(new File(job.getOutputDir(), slice),
dirName), name).toString();
+ wname = new Path(new Path(new Path(job.getOutputPath(),
segmentName + "-" + slice), dirName), name);
}
- res = new MapFile.Writer(fs, wname, UTF8.class, clazz);
+ res = new MapFile.Writer(fs, wname.toString(), UTF8.class, clazz);
sliceWriters.put(slice + dirName, res);
return res;
}
@@ -512,12 +527,14 @@
}
}
- public void merge(File out, File[] segs, boolean filter, long slice) throws
Exception {
- LOG.info("Merging " + segs.length + " segments to " + out);
+ public void merge(Path out, Path[] segs, boolean filter, long slice) throws
Exception {
+ String segmentName = Generator.generateSegmentName();
JobConf job = new JobConf(getConf());
- job.setJobName("mergesegs " + out);
+ job.setJobName("mergesegs " + out + "/" + segmentName);
+ LOG.info("Merging " + segs.length + " segments to " + out + "/" +
segmentName);
job.setBoolean("segment.merger.filter", filter);
job.setLong("segment.merger.slice", slice);
+ job.set("segment.merger.segmentName", segmentName);
FileSystem fs = FileSystem.get(getConf());
// prepare the minimal common set of input dirs
boolean g = true;
@@ -533,12 +550,12 @@
continue;
}
LOG.info("SegmentMerger: adding " + segs[i]);
- File cDir = new File(segs[i], Content.DIR_NAME);
- File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
- File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
- File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
- File pdDir = new File(segs[i], ParseData.DIR_NAME);
- File ptDir = new File(segs[i], ParseText.DIR_NAME);
+ Path cDir = new Path(segs[i], Content.DIR_NAME);
+ Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+ Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+ Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+ Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+ Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
c = c && fs.exists(cDir);
g = g && fs.exists(gDir);
f = f && fs.exists(fDir);
@@ -557,28 +574,28 @@
for (int i = 0; i < segs.length; i++) {
if (segs[i] == null) continue;
if (g) {
- File gDir = new File(segs[i], CrawlDatum.GENERATE_DIR_NAME);
- job.addInputDir(gDir);
+ Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+ job.addInputPath(gDir);
}
if (c) {
- File cDir = new File(segs[i], Content.DIR_NAME);
- job.addInputDir(cDir);
+ Path cDir = new Path(segs[i], Content.DIR_NAME);
+ job.addInputPath(cDir);
}
if (f) {
- File fDir = new File(segs[i], CrawlDatum.FETCH_DIR_NAME);
- job.addInputDir(fDir);
+ Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+ job.addInputPath(fDir);
}
if (p) {
- File pDir = new File(segs[i], CrawlDatum.PARSE_DIR_NAME);
- job.addInputDir(pDir);
+ Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+ job.addInputPath(pDir);
}
if (pd) {
- File pdDir = new File(segs[i], ParseData.DIR_NAME);
- job.addInputDir(pdDir);
+ Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+ job.addInputPath(pdDir);
}
if (pt) {
- File ptDir = new File(segs[i], ParseText.DIR_NAME);
- job.addInputDir(ptDir);
+ Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+ job.addInputPath(ptDir);
}
}
job.setInputFormat(ObjectInputFormat.class);
@@ -586,7 +603,7 @@
job.setInputValueClass(ObjectWritable.class);
job.setMapperClass(SegmentMerger.class);
job.setReducerClass(SegmentMerger.class);
- job.setOutputDir(out);
+ job.setOutputPath(out);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(ObjectWritable.class);
job.setOutputFormat(SegmentOutputFormat.class);
@@ -602,7 +619,7 @@
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2
...) [-filter] [-slice NNNN]");
- System.err.println("\toutput_dir\tname of the resulting segment, or the
parent dir of segment slices");
+ System.err.println("\toutput_dir\tname of the parent dir for output
segment slice(s)");
System.err.println("\t-dir segments\tparent dir containing several
segments");
System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
System.err.println("\t-filter\t\tfilter out URL-s prohibited by current
URLFilters");
@@ -611,14 +628,14 @@
}
Configuration conf = NutchConfiguration.create();
final FileSystem fs = FileSystem.get(conf);
- File out = new File(args[0]);
+ Path out = new Path(args[0]);
ArrayList segs = new ArrayList();
long sliceSize = 0;
boolean filter = false;
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-dir")) {
- File[] files = fs.listFiles(new File(args[++i]), new FileFilter() {
- public boolean accept(File f) {
+ Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() {
+ public boolean accept(Path f) {
try {
if (fs.isDirectory(f)) return true;
} catch (IOException e) {}
@@ -633,7 +650,7 @@
} else if (args[i].equals("-slice")) {
sliceSize = Long.parseLong(args[++i]);
} else {
- segs.add(new File(args[i]));
+ segs.add(new Path(args[i]));
}
}
if (segs.size() == 0) {
@@ -641,7 +658,7 @@
return;
}
SegmentMerger merger = new SegmentMerger(conf);
- merger.merge(out, (File[]) segs.toArray(new File[segs.size()]), filter,
sliceSize);
+ merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter,
sliceSize);
}
}