Author: ab Date: Wed Jan 17 13:06:50 2007 New Revision: 497172 URL: http://svn.apache.org/viewvc?view=rev&rev=497172 Log: Revert accidental change to bin/nutch.
Fix Fetcher.java to correctly split input. Add Fetcher2 - a queue-based fetcher implementation. Added: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java (with props) Modified: lucene/nutch/trunk/bin/nutch lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Modified: lucene/nutch/trunk/bin/nutch URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/bin/nutch?view=diff&rev=497172&r1=497171&r2=497172 ============================================================================== --- lucene/nutch/trunk/bin/nutch (original) +++ lucene/nutch/trunk/bin/nutch Wed Jan 17 13:06:50 2007 @@ -41,6 +41,7 @@ echo " generate generate new segments to fetch from crawl db" echo " freegen generate new segments to fetch from text files" echo " fetch fetch a segment's pages" + echo " fetch2 fetch a segment's pages using Fetcher2 implementation" echo " parse parse a segment's pages" echo " readseg read / dump segment data" echo " mergesegs merge several segments, with optional filtering and slicing" @@ -177,6 +178,8 @@ CLASS=org.apache.nutch.tools.FreeGenerator elif [ "$COMMAND" = "fetch" ] ; then CLASS=org.apache.nutch.fetcher.Fetcher +elif [ "$COMMAND" = "fetch2" ] ; then + CLASS=org.apache.nutch.fetcher.Fetcher2 elif [ "$COMMAND" = "parse" ] ; then CLASS=org.apache.nutch.parse.ParseSegment elif [ "$COMMAND" = "readdb" ] ; then @@ -220,6 +223,5 @@ fi # run it -echo "$JAVA" $JAVA_HEAP_MAX $NUTCH_OPTS -classpath "$CLASSPATH" $CLASS "$@" exec "$JAVA" $JAVA_HEAP_MAX $NUTCH_OPTS -classpath "$CLASSPATH" $CLASS "$@" Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?view=diff&rev=497172&r1=497171&r2=497172 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Wed Jan 17 13:06:50 2007 @@ -48,9 +48,10 @@ public static class InputFormat extends SequenceFileInputFormat { /** Don't split inputs, to keep things polite. */ - public InputSplit[] getSplits(FileSystem fs, JobConf job, int nSplits) + public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { Path[] files = listPaths(job); + FileSystem fs = FileSystem.get(job); InputSplit[] splits = new InputSplit[files.length]; for (int i = 0; i < files.length; i++) { splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job); Added: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java?view=auto&rev=497172 ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java Wed Jan 17 13:06:50 2007 @@ -0,0 +1,875 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.fetcher; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +// Commons Logging imports +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.SignatureFactory; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.*; +import org.apache.nutch.protocol.*; +import org.apache.nutch.parse.*; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.*; + + +/** + * A queue-based fetcher. + * + * <p>This fetcher uses a well-known model of one producer (a QueueFeeder) + * and many consumers (FetcherThread-s). + * + * <p>QueueFeeder reads input fetchlists and + * populates a set of FetchItemQueue-s, which hold FetchItem-s that + * describe the items to be fetched. There are as many queues as there are unique + * hosts, but at any given time the total number of fetch items in all queues + * is less than a fixed number (currently set to a multiple of the number of + * threads). + * + * <p>As items are consumed from the queues, the QueueFeeder continues to add new + * input items, so that their total count stays fixed (FetcherThread-s may also + * add new items to the queues e.g. as a results of redirection) - until all + * input items are exhausted, at which point the number of items in the queues + * begins to decrease. When this number reaches 0 fetcher will finish. + * + * <p>This fetcher implementation handles per-host blocking itself, instead + * of delegating this work to protocol-specific plugins. + * Each per-host queue handles its own "politeness" settings, such as the + * maximum number of concurrent requests and crawl delay between consecutive + * requests - and also a list of requests in progress, and the time the last + * request was finished. As FetcherThread-s ask for new items to be fetched, + * queues may return eligible items or null if for "politeness" reasons this + * host's queue is not yet ready. + * + * <p>If there are still unfetched items on the queues, but none of the items + * are ready, FetcherThread-s will spin-wait until either some items become + * available, or a timeout is reached (at which point the Fetcher will abort, + * assuming the task is hung). + * + * @author Andrzej Bialecki + */ +public class Fetcher2 extends Configured implements MapRunnable { + + public static final Log LOG = LogFactory.getLog(Fetcher2.class); + + public static class InputFormat extends SequenceFileInputFormat { + /** Don't split inputs, to keep things polite. */ + public InputSplit[] getSplits(JobConf job, int nSplits) + throws IOException { + Path[] files = listPaths(job); + FileSplit[] splits = new FileSplit[files.length]; + FileSystem fs = FileSystem.get(job); + for (int i = 0; i < files.length; i++) { + splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job); + } + return splits; + } + } + + private OutputCollector output; + private Reporter reporter; + + private String segmentName; + private AtomicInteger activeThreads = new AtomicInteger(0); + private AtomicInteger spinWaiting = new AtomicInteger(0); + + private long start = System.currentTimeMillis(); // start time of fetcher run + private AtomicLong lastRequestStart = new AtomicLong(start); + + private AtomicLong bytes = new AtomicLong(0); // total bytes fetched + private AtomicInteger pages = new AtomicInteger(0); // total pages fetched + private AtomicInteger errors = new AtomicInteger(0); // total pages errored + + private boolean storingContent; + private boolean parsing; + FetchItemQueues fetchQueues; + QueueFeeder feeder; + + /** + * This class described the item to be fetched. + */ + private static class FetchItem { + String queueID; + Text url; + URL u; + CrawlDatum datum; + + public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) { + this.url = url; + this.u = u; + this.datum = datum; + this.queueID = queueID; + } + + /** Create an item. Queue id will be created based on <code>byIP</code> + * argument, either as a protocol + hostname pair, or protocol + IP + * address pair. + */ + public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) { + String queueID; + URL u = null; + try { + u = new URL(url.toString()); + } catch (Exception e) { + LOG.warn("Cannot parse url: " + url, e); + return null; + } + String proto = u.getProtocol().toLowerCase(); + String host; + if (byIP) { + try { + InetAddress addr = InetAddress.getByName(u.getHost()); + host = addr.getHostAddress(); + } catch (UnknownHostException e) { + // unable to resolve it, so don't fall back to host name + LOG.warn("Unable to resolve: " + u.getHost() + ", skipping."); + return null; + } + } else { + host = u.getHost(); + if (host == null) { + LOG.warn("Unknown host for url: " + url + ", skipping."); + return null; + } + host = host.toLowerCase(); + } + queueID = proto + "://" + host; + return new FetchItem(url, u, datum, queueID); + } + + public CrawlDatum getDatum() { + return datum; + } + + public String getQueueID() { + return queueID; + } + + public Text getUrl() { + return url; + } + + public URL getURL2() { + return u; + } + } + + /** + * This class handles FetchItems which come from the same host ID (be it + * a proto/hostname or proto/IP pair). It also keeps track of requests in + * progress and elapsed time between requests. + */ + private static class FetchItemQueue { + List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>()); + Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>()); + AtomicLong endTime = new AtomicLong(); + long crawlDelay; + long minCrawlDelay; + int maxThreads; + Configuration conf; + + public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) { + this.conf = conf; + this.maxThreads = maxThreads; + this.crawlDelay = crawlDelay; + this.minCrawlDelay = minCrawlDelay; + // ready to start + this.endTime.set(System.currentTimeMillis() - crawlDelay); + } + + public int getQueueSize() { + return queue.size(); + } + + public int getInProgressSize() { + return inProgress.size(); + } + + public void finishFetchItem(FetchItem it) { + if (it != null) { + inProgress.remove(it); + endTime.set(System.currentTimeMillis()); + } + } + + public void addFetchItem(FetchItem it) { + if (it == null) return; + queue.add(it); + } + + public void addInProgressFetchItem(FetchItem it) { + if (it == null) return; + inProgress.add(it); + } + + public FetchItem getFetchItem() { + if (inProgress.size() >= maxThreads) return null; + long now = System.currentTimeMillis(); + long last = endTime.get() + (maxThreads > 1 ? crawlDelay : minCrawlDelay); + if (last > now) return null; + FetchItem it = null; + if (queue.size() == 0) return null; + try { + it = queue.remove(0); + inProgress.add(it); + } catch (Exception e) { + + } + return it; + } + + public synchronized void dump() { + LOG.info(" maxThreads = " + maxThreads); + LOG.info(" inProgress = " + inProgress.size()); + LOG.info(" crawlDelay = " + crawlDelay); + LOG.info(" minCrawlDelay = " + minCrawlDelay); + LOG.info(" endTime = " + endTime.get()); + LOG.info(" now = " + System.currentTimeMillis()); + for (int i = 0; i < queue.size(); i++) { + FetchItem it = queue.get(i); + LOG.info(" " + i + ". " + it.url); + } + } + } + + /** + * Convenience class - a collection of queues that keeps track of the total + * number of items, and provides items eligible for fetching from any queue. + */ + private static class FetchItemQueues { + public static final String DEFAULT_ID = "default"; + Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>(); + AtomicInteger totalSize = new AtomicInteger(0); + int maxThreads; + boolean byIP; + long crawlDelay; + long minCrawlDelay; + Configuration conf; + + public FetchItemQueues(Configuration conf) { + this.conf = conf; + this.maxThreads = conf.getInt("fetcher.threads.per.host", 1); + // backward-compatible default setting + this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false); + this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); + this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000); + } + + public int getTotalSize() { + return totalSize.get(); + } + + public int getQueueCount() { + return queues.size(); + } + + public void addFetchItem(Text url, CrawlDatum datum) { + FetchItem it = FetchItem.create(url, datum, byIP); + if (it != null) addFetchItem(it); + } + + public void addFetchItem(FetchItem it) { + FetchItemQueue fiq = getFetchItemQueue(it.queueID); + fiq.addFetchItem(it); + totalSize.incrementAndGet(); + } + + public void finishFetchItem(FetchItem it) { + FetchItemQueue fiq = queues.get(it.queueID); + if (fiq == null) { + LOG.warn("Attempting to finish item from unknown queue: " + it); + return; + } + fiq.finishFetchItem(it); + } + + public synchronized FetchItemQueue getFetchItemQueue(String id) { + FetchItemQueue fiq = queues.get(id); + if (fiq == null) { + // initialize queue + fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay); + queues.put(id, fiq); + } + return fiq; + } + + public synchronized FetchItem getFetchItem() { + Iterator it = queues.keySet().iterator(); + while (it.hasNext()) { + FetchItemQueue fiq = queues.get(it.next()); + // reap empty queues + if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) { + it.remove(); + continue; + } + FetchItem fit = fiq.getFetchItem(); + if (fit != null) { + totalSize.decrementAndGet(); + return fit; + } + } + return null; + } + + public synchronized void dump() { + Iterator it = queues.keySet().iterator(); + for (String id : queues.keySet()) { + FetchItemQueue fiq = queues.get(id); + if (fiq.getQueueSize() == 0) continue; + LOG.info("* queue: " + id); + fiq.dump(); + } + } + } + + /** + * This class feeds the queues with input items, and re-fills them as + * items are consumed by FetcherThread-s. + */ + private static class QueueFeeder extends Thread { + private RecordReader reader; + private FetchItemQueues queues; + private int size; + + public QueueFeeder(RecordReader reader, FetchItemQueues queues, int size) { + this.reader = reader; + this.queues = queues; + this.size = size; + this.setDaemon(true); + this.setName("QueueFeeder"); + } + + public void run() { + boolean hasMore = true; + int cnt = 0; + + while (hasMore) { + int feed = size - queues.getTotalSize(); + if (feed <= 0) { + // queues are full - spin-wait until they have some free space + try { + Thread.sleep(1000); + } catch (Exception e) {}; + continue; + } else { + LOG.debug("-feeding " + feed + " input urls ..."); + while (feed > 0 && hasMore) { + try { + Text url = new Text(); + CrawlDatum datum = new CrawlDatum(); + hasMore = reader.next(url, datum); + if (hasMore) { + queues.addFetchItem(url, datum); + cnt++; + feed--; + } + } catch (IOException e) { + LOG.fatal("QueueFeeder error reading input, record " + cnt, e); + return; + } + } + } + } + LOG.info("QueueFeeder finished: total " + cnt + " records."); + } + } + + /** + * This class picks items from queues and fetches the pages. + */ + private class FetcherThread extends Thread { + private Configuration conf; + private URLFilters urlFilters; + private ScoringFilters scfilters; + private ParseUtil parseUtil; + private URLNormalizers normalizers; + private ProtocolFactory protocolFactory; + private long maxCrawlDelay; + private boolean byIP; + private int maxRedirect; + + public FetcherThread(Configuration conf) { + this.setDaemon(true); // don't hang JVM on exit + this.setName("FetcherThread"); // use an informative name + this.conf = conf; + this.urlFilters = new URLFilters(conf); + this.scfilters = new ScoringFilters(conf); + this.parseUtil = new ParseUtil(conf); + this.protocolFactory = new ProtocolFactory(conf); + this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER); + this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000; + // backward-compatible default setting + this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true); + this.maxRedirect = conf.getInt("http.redirect.max", 3); + } + + public void run() { + activeThreads.incrementAndGet(); // count threads + + FetchItem fit = null; + try { + + while (true) { + fit = fetchQueues.getFetchItem(); + if (fit == null) { + if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { + LOG.debug(getName() + " spin-waiting ..."); + // spin-wait. + spinWaiting.incrementAndGet(); + try { + Thread.sleep(500); + } catch (Exception e) {} + spinWaiting.decrementAndGet(); + continue; + } else { + // all done, finish this thread + return; + } + } + lastRequestStart.set(System.currentTimeMillis()); + try { + if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); } + + // fetch the page + boolean redirecting = false; + int redirectCount = 0; + do { + if (LOG.isDebugEnabled()) { + LOG.debug("redirectCount=" + redirectCount); + } + redirecting = false; + Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString()); + RobotRules rules = protocol.getRobotRules(fit.url, fit.datum); + if (!rules.isAllowed(fit.u)) { + // unblock + fetchQueues.finishFetchItem(fit); + if (LOG.isDebugEnabled()) { + LOG.debug("Denied by robots.txt: " + fit.url); + } + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); + continue; + } + if (rules.getCrawlDelay() > 0) { + if (rules.getCrawlDelay() > maxCrawlDelay) { + // unblock + fetchQueues.finishFetchItem(fit); + LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping"); + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); + continue; + } else { + FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); + fiq.crawlDelay = rules.getCrawlDelay(); + } + } + ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum); + ProtocolStatus status = output.getStatus(); + Content content = output.getContent(); + ParseStatus pstatus = null; + // unblock queue + fetchQueues.finishFetchItem(fit); + + switch(status.getCode()) { + + case ProtocolStatus.WOULDBLOCK: + // unblock + fetchQueues.finishFetchItem(fit); + // retry ? + fetchQueues.addFetchItem(fit); + break; + + case ProtocolStatus.SUCCESS: // got a page + pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); + updateStatus(content.getContent().length); + if (pstatus != null && pstatus.isSuccess() && + pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { + String newUrl = pstatus.getMessage(); + newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); + newUrl = this.urlFilters.filter(newUrl); + if (newUrl != null && !newUrl.equals(fit.url.toString())) { + output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_REDIR_PERM); + Text redirUrl = new Text(newUrl); + if (maxRedirect > 0) { + redirecting = true; + redirectCount++; + fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP); + FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); + fiq.addInProgressFetchItem(fit); + if (LOG.isDebugEnabled()) { + LOG.debug(" - content redirect to " + redirUrl + " (fetching now)"); + } + } else { + output(redirUrl, new CrawlDatum(), null, null, CrawlDatum.STATUS_LINKED); + if (LOG.isDebugEnabled()) { + LOG.debug(" - content redirect to " + redirUrl + " (fetching later)"); + } + } + } else if (LOG.isDebugEnabled()) { + LOG.debug(" - content redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); + } + } + break; + + case ProtocolStatus.MOVED: // redirect + case ProtocolStatus.TEMP_MOVED: + int code; + if (status.getCode() == ProtocolStatus.MOVED) { + code = CrawlDatum.STATUS_FETCH_REDIR_PERM; + } else { + code = CrawlDatum.STATUS_FETCH_REDIR_TEMP; + } + output(fit.url, fit.datum, content, status, code); + String newUrl = status.getMessage(); + newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); + newUrl = this.urlFilters.filter(newUrl); + if (newUrl != null && !newUrl.equals(fit.url.toString())) { + Text redirUrl = new Text(newUrl); + if (maxRedirect > 0) { + redirecting = true; + redirectCount++; + fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP); + FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); + fiq.addInProgressFetchItem(fit); + if (LOG.isDebugEnabled()) { + LOG.debug(" - protocol redirect to " + redirUrl + " (fetching now)"); + } + } else { + output(redirUrl, new CrawlDatum(), null, null, CrawlDatum.STATUS_LINKED); + if (LOG.isDebugEnabled()) { + LOG.debug(" - protocol redirect to " + redirUrl + " (fetching later)"); + } + } + } else if (LOG.isDebugEnabled()) { + LOG.debug(" - protocol redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); + } + break; + + case ProtocolStatus.EXCEPTION: + logError(fit.url, status.getMessage()); + /* FALLTHROUGH */ + case ProtocolStatus.RETRY: // retry + fit.datum.setRetriesSinceFetch(fit.datum.getRetriesSinceFetch()+1); + /* FALLTHROUGH */ + // intermittent blocking - retry without increasing the counter + case ProtocolStatus.BLOCKED: + output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); + break; + + case ProtocolStatus.GONE: // gone + case ProtocolStatus.NOTFOUND: + case ProtocolStatus.ACCESS_DENIED: + case ProtocolStatus.ROBOTS_DENIED: + case ProtocolStatus.NOTMODIFIED: + output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE); + break; + + default: + if (LOG.isWarnEnabled()) { + LOG.warn("Unknown ProtocolStatus: " + status.getCode()); + } + output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE); + } + + if (redirecting && redirectCount >= maxRedirect) { + fetchQueues.finishFetchItem(fit); + if (LOG.isInfoEnabled()) { + LOG.info(" - redirect count exceeded " + fit.url); + } + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE); + } + + } while (redirecting && (redirectCount < maxRedirect)); + + } catch (Throwable t) { // unexpected exception + // unblock + fetchQueues.finishFetchItem(fit); + logError(fit.url, t.toString()); + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY); + } + } + + } catch (Throwable e) { + if (LOG.isFatalEnabled()) { + e.printStackTrace(LogUtil.getFatalStream(LOG)); + LOG.fatal("fetcher caught:"+e.toString()); + } + } finally { + if (fit != null) fetchQueues.finishFetchItem(fit); + activeThreads.decrementAndGet(); // count threads + LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads); + } + } + + private void logError(Text url, String message) { + if (LOG.isInfoEnabled()) { + LOG.info("fetch of " + url + " failed with: " + message); + } + errors.incrementAndGet(); + } + + private ParseStatus output(Text key, CrawlDatum datum, + Content content, ProtocolStatus pstatus, int status) { + + datum.setStatus(status); + datum.setFetchTime(System.currentTimeMillis()); + if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); + + if (content == null) { + String url = key.toString(); + content = new Content(url, url, new byte[0], "", new Metadata(), this.conf); + } + Metadata metadata = content.getMetadata(); + // add segment to metadata + metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); + // add score to content metadata so that ParseSegment can pick it up. + try { + scfilters.passScoreBeforeParsing(key, datum, content); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + e.printStackTrace(LogUtil.getWarnStream(LOG)); + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + Parse parse = null; + if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { + ParseStatus parseStatus; + try { + parse = this.parseUtil.parse(content); + parseStatus = parse.getData().getStatus(); + } catch (Exception e) { + parseStatus = new ParseStatus(e); + } + if (!parseStatus.isSuccess()) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error parsing: " + key + ": " + parseStatus); + } + parse = parseStatus.getEmptyParse(getConf()); + } + // Calculate page signature. For non-parsing fetchers this will + // be done in ParseSegment + byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse); + metadata.set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); + datum.setSignature(signature); + // Ensure segment name and score are in parseData metadata + parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); + parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); + try { + scfilters.passScoreAfterParsing(key, content, parse); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + e.printStackTrace(LogUtil.getWarnStream(LOG)); + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + } + + try { + output.collect + (key, + new FetcherOutput(datum, + storingContent ? content : null, + parse != null ? new ParseImpl(parse) : null)); + } catch (IOException e) { + if (LOG.isFatalEnabled()) { + e.printStackTrace(LogUtil.getFatalStream(LOG)); + LOG.fatal("fetcher caught:"+e.toString()); + } + } + if (parse != null) return parse.getData().getStatus(); + else return null; + } + + } + + public Fetcher2() { super(null); } + + public Fetcher2(Configuration conf) { super(conf); } + + private void updateStatus(int bytesInPage) throws IOException { + pages.incrementAndGet(); + bytes.addAndGet(bytesInPage); + } + + + private void reportStatus() throws IOException { + String status; + long elapsed = (System.currentTimeMillis() - start)/1000; + status = + pages+" pages, "+errors+" errors, " + + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, " + + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, "; + reporter.setStatus(status); + } + + public void configure(JobConf job) { + setConf(job); + + this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY); + this.storingContent = isStoringContent(job); + this.parsing = isParsing(job); + +// if (job.getBoolean("fetcher.verbose", false)) { +// LOG.setLevel(Level.FINE); +// } + } + + public void close() {} + + public static boolean isParsing(Configuration conf) { + return conf.getBoolean("fetcher.parse", true); + } + + public static boolean isStoringContent(Configuration conf) { + return conf.getBoolean("fetcher.store.content", true); + } + + public void run(RecordReader input, OutputCollector output, + Reporter reporter) throws IOException { + + this.output = output; + this.reporter = reporter; + this.fetchQueues = new FetchItemQueues(getConf()); + + int threadCount = getConf().getInt("fetcher.threads.fetch", 10); + if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); } + + feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); + //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); + feeder.start(); + + // set non-blocking & no-robots mode for HTTP protocol plugins. + getConf().setBoolean("http.plugin.check.blocking", false); + getConf().setBoolean("http.plugin.check.robots", false); + + for (int i = 0; i < threadCount; i++) { // spawn threads + new FetcherThread(getConf()).start(); + } + + // select a timeout that avoids a task timeout + long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2; + + do { // wait for threads to exit + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + + reportStatus(); + LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get() + + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()); + + if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { + fetchQueues.dump(); + } + // some requests seem to hang, despite all intentions + if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { + if (LOG.isWarnEnabled()) { + LOG.warn("Aborting with "+activeThreads+" hung threads."); + } + return; + } + + } while (activeThreads.get() > 0); + LOG.info("-activeThreads=" + activeThreads); + + } + + public void fetch(Path segment, int threads, boolean parsing) + throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: starting"); + LOG.info("Fetcher: segment: " + segment); + } + + JobConf job = new NutchJob(getConf()); + job.setJobName("fetch " + segment); + + job.setInt("fetcher.threads.fetch", threads); + job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); + job.setBoolean("fetcher.parse", parsing); + + // for politeness, don't permit parallel execution of a single task + job.setSpeculativeExecution(false); + + job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); + job.setInputFormat(InputFormat.class); + + job.setMapRunnerClass(Fetcher2.class); + + job.setOutputPath(segment); + job.setOutputFormat(FetcherOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(FetcherOutput.class); + + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); } + } + + + /** Run the fetcher. */ + public static void main(String[] args) throws Exception { + + String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]"; + + if (args.length < 1) { + System.err.println(usage); + System.exit(-1); + } + + Path segment = new Path(args[0]); + + Configuration conf = NutchConfiguration.create(); + + int threads = conf.getInt("fetcher.threads.fetch", 10); + boolean parsing = true; + + for (int i = 1; i < args.length; i++) { // parse command line + if (args[i].equals("-threads")) { // found -threads option + threads = Integer.parseInt(args[++i]); + } else if (args[i].equals("-noParsing")) parsing = false; + } + + conf.setInt("fetcher.threads.fetch", threads); + if (!parsing) { + conf.setBoolean("fetcher.parse", parsing); + } + Fetcher2 fetcher = new Fetcher2(conf); // make a Fetcher + + fetcher.fetch(segment, threads, parsing); // run the Fetcher + + } +} Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java ------------------------------------------------------------------------------ svn:eol-style = native