> > Secondly, will it still be possible to get the output dumped (ie. > > segread -dump) to a flat file in large chunks? > > In principle, yes, but I have not tested the segread code in the mapred > branch, and it may need to be updated, as the structure of segments has > changed a bit.
I'm not a Java programmer nor do I really understand what is going on, but I took a crack at reimplementing the most basic version of the segread code (full output with -dump to stdout). It appears to function correctly with a single Nutch backend. I am sure it is not correct to send data to STDOUT from the reduce() function, but I'm not sure what other location is more appropriate. I am hoping that this will encourage someone to either finish it off or tell me about the logic issues. The attached SegmentReader.java goes into org.apache.nutch.crawl and you may need to fiddle with the bin/nutch shell script to use it. -- Rod Taylor <[EMAIL PROTECTED]>
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nutch.crawl; import org.apache.nutch.io.*; import org.apache.nutch.fs.*; import org.apache.nutch.mapred.*; import org.apache.nutch.util.*; import org.apache.nutch.protocol.*; import org.apache.nutch.parse.*; import org.apache.nutch.net.*; import java.io.*; import java.util.*; import java.util.logging.*; /* Parse content in a segment. */ public class SegmentReader extends NutchConfigured implements Reducer { public static final String DIR_NAME = "segdump"; public static final Logger LOG = LogFormatter.getLogger(SegmentReader.class.getName()); long recNo = 0L; /** Wraps inputs in an [EMAIL PROTECTED] ObjectWritable}, to permit merging different * types in reduce. */ public static class InputFormat extends SequenceFileInputFormat { public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); return new SequenceFileRecordReader(fs, split) { public synchronized boolean next(Writable key, Writable value) throws IOException { ObjectWritable wrapper = (ObjectWritable)value; try { wrapper.set(getValueClass().newInstance()); } catch (Exception e) { throw new IOException(e.toString()); } return super.next(key, (Writable)wrapper.get()); } }; } } /** Unwrap Lucene Documents created by reduce and add them to an index. */ public static class OutputFormat implements org.apache.nutch.mapred.OutputFormat { public RecordWriter getRecordWriter(final NutchFileSystem fs, JobConf job, String name) throws IOException { final File segdump = new File(new File(job.getOutputDir(), SegmentReader.DIR_NAME), name); // Get the old copy out of the way fs.delete(segdump); final MapFile.Writer segdumpOut = new MapFile.Writer(fs, segdump.toString(), UTF8.class, SegmentReader.class); return new RecordWriter() { boolean closed; public void write(WritableComparable key, Writable value) throws IOException { // unwrap & index doc // ObjectWriter } public void close(final Reporter reporter) throws IOException { // spawn a thread to give progress heartbeats } }; } } public SegmentReader() { super(null); } public SegmentReader(NutchConf conf) { super(conf); } public void configure(JobConf job) { } public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum crawlDatum = null; Content content = null; ParseData parseData = null; ParseText parseText = null; while (values.hasNext()) { Object value = ((ObjectWritable)values.next()).get(); // unwrap if (value instanceof CrawlDatum) { crawlDatum = (CrawlDatum)value; } else if (value instanceof Content) { content = (Content)value; } else if (value instanceof ParseData) { parseData = (ParseData)value; } else if (value instanceof ParseText) { parseText = (ParseText)value; } else { LOG.warning("Unrecognized type: "+value.getClass()); } } Content co = new Content(); System.out.println("Recno:: " + recNo++); if (crawlDatum != null) System.out.println("CrawlDatum::\n" + crawlDatum.toString()); if (content != null) System.out.println("Content::\n" + content.toString()); if (parseData != null) System.out.println("ParseData::\n" + parseData.toString()); if (parseText != null) System.out.println("ParseText::\n" + parseText.toString()); System.out.println(""); } public void reader(File segment) throws IOException { LOG.info("Reader: segment: " + segment); JobConf job = new JobConf(getConf()); job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME)); job.addInputDir(new File(segment, Content.DIR_NAME)); job.addInputDir(new File(segment, ParseData.DIR_NAME)); job.addInputDir(new File(segment, ParseText.DIR_NAME)); job.setInputFormat(InputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(ObjectWritable.class); job.setReducerClass(SegmentReader.class); job.setOutputDir(segment); job.setOutputFormat(OutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(ObjectWritable.class); JobClient.runJob(job); LOG.info("Reader: done"); } public static void main(String[] args) throws Exception { SegmentReader segmentReader = new SegmentReader(NutchConf.get()); String usage = "Usage: SegmentReader <segment>"; if (args.length == 0) { System.err.println(usage); System.exit(-1); } segmentReader.reader(new File(args[0])); } }
