Author: dogacan
Date: Thu Apr  2 12:46:47 2009
New Revision: 761271

NUTCH-721 - Commit old fetcher as OldFetcher for now so that we can test 
Fetcher2 performance.


Added: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/ (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/ Thu 
Apr  2 12:46:47 2009
@@ -0,0 +1,574 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+import java.util.Map.Entry;
+// Commons Logging imports
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.*;
+/** The fetcher. Most of the work is done by plugins. */
+public class OldFetcher extends Configured implements Tool, 
MapRunnable<WritableComparable, Writable, Text, NutchWritable> { 
+  public static final Log LOG = LogFactory.getLog(OldFetcher.class);
+  public static final int PERM_REFRESH_TIME = 5;
+  public static final String CONTENT_REDIR = "content";
+  public static final String PROTOCOL_REDIR = "protocol";
+  public static class InputFormat extends 
SequenceFileInputFormat<WritableComparable, Writable> {
+    /** Don't split inputs, to keep things polite. */
+    public InputSplit[] getSplits(JobConf job, int nSplits)
+      throws IOException {
+      FileStatus[] files = listStatus(job);
+      FileSystem fs = FileSystem.get(job);
+      InputSplit[] splits = new InputSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        FileStatus cur = files[i];
+        splits[i] = new FileSplit(cur.getPath(), 0,
+            cur.getLen(), (String[])null);
+      }
+      return splits;
+    }
+  }
+  private RecordReader<WritableComparable, Writable> input;
+  private OutputCollector<Text, NutchWritable> output;
+  private Reporter reporter;
+  private String segmentName;
+  private int activeThreads;
+  private int maxRedirect;
+  private long start = System.currentTimeMillis(); // start time of fetcher run
+  private long lastRequestStart = start;
+  private long bytes;                             // total bytes fetched
+  private int pages;                              // total pages fetched
+  private int errors;                             // total pages errored
+  private boolean storingContent;
+  private boolean parsing;
+  private class FetcherThread extends Thread {
+    private Configuration conf;
+    private URLFilters urlFilters;
+    private ScoringFilters scfilters;
+    private ParseUtil parseUtil;
+    private URLNormalizers normalizers;
+    private ProtocolFactory protocolFactory;
+    private boolean redirecting;
+    private int redirectCount;
+    private String reprUrl;
+    public FetcherThread(Configuration conf) {
+      this.setDaemon(true);                       // don't hang JVM on exit
+      this.setName("FetcherThread");              // use an informative name
+      this.conf = conf;
+      this.urlFilters = new URLFilters(conf);
+      this.scfilters = new ScoringFilters(conf);
+      this.parseUtil = new ParseUtil(conf);
+      this.protocolFactory = new ProtocolFactory(conf);
+      this.normalizers = new URLNormalizers(conf, 
+    }
+    public void run() {
+      synchronized (OldFetcher.this) {activeThreads++;} // count threads
+      try {
+        Text key = new Text();
+        CrawlDatum datum = new CrawlDatum();
+        while (true) {
+          // TODO : NUTCH-258 ...
+          // If something bad happened, then exit
+          // if (conf.getBoolean("fetcher.exit", false)) {
+          //   break;
+          // ]
+          try {                                   // get next entry from input
+            if (!, datum)) {
+              break;                              // at eof, exit
+            }
+          } catch (IOException e) {
+            if (LOG.isFatalEnabled()) {
+              e.printStackTrace(LogUtil.getFatalStream(LOG));
+              LOG.fatal("fetcher caught:"+e.toString());
+            }
+            break;
+          }
+          synchronized (OldFetcher.this) {
+            lastRequestStart = System.currentTimeMillis();
+          }
+          // url may be changed through redirects.
+          Text url = new Text(key);
+          Text reprUrlWritable =
+            (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+          if (reprUrlWritable == null) {
+            reprUrl = key.toString();
+          } else {
+            reprUrl = reprUrlWritable.toString();
+          }
+          try {
+            if (LOG.isInfoEnabled()) {"fetching " + url); }
+            // fetch the page
+            redirectCount = 0;
+            do {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("redirectCount=" + redirectCount);
+              }
+              redirecting = false;
+              Protocol protocol = 
+              ProtocolOutput output = protocol.getProtocolOutput(url, datum);
+              ProtocolStatus status = output.getStatus();
+              Content content = output.getContent();
+              ParseStatus pstatus = null;
+              String urlString = url.toString();
+              if (reprUrl != null && !reprUrl.equals(urlString)) {
+                datum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+                    new Text(reprUrl));
+              }
+              switch(status.getCode()) {
+              case ProtocolStatus.SUCCESS:        // got a page
+                pstatus = output(url, datum, content, status, 
+                updateStatus(content.getContent().length);
+                if (pstatus != null && pstatus.isSuccess() &&
+                        pstatus.getMinorCode() == 
+                  String newUrl = pstatus.getMessage();
+                  int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
+                  url = handleRedirect(url, datum, urlString, newUrl,
+                                       refreshTime < PERM_REFRESH_TIME,
+                                       CONTENT_REDIR);
+                }
+                break;
+              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.TEMP_MOVED:
+                int code;
+                boolean temp;
+                if (status.getCode() == ProtocolStatus.MOVED) {
+                  code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+                  temp = false;
+                } else {
+                  code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+                  temp = true;
+                }
+                output(url, datum, content, status, code);
+                String newUrl = status.getMessage();
+                url = handleRedirect(url, datum, urlString, newUrl,
+                                     temp, PROTOCOL_REDIR);
+                break;
+              // failures - increase the retry counter
+              case ProtocolStatus.EXCEPTION:
+                logError(url, status.getMessage());
+              /* FALLTHROUGH */
+              case ProtocolStatus.RETRY:          // retry
+              case ProtocolStatus.WOULDBLOCK:
+              case ProtocolStatus.BLOCKED:
+                output(url, datum, null, status, 
+                break;
+              // permanent failures
+              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.NOTFOUND:
+              case ProtocolStatus.ACCESS_DENIED:
+              case ProtocolStatus.ROBOTS_DENIED:
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+                break;
+              case ProtocolStatus.NOTMODIFIED:
+                output(url, datum, null, status, 
+                break;
+              default:
+                if (LOG.isWarnEnabled()) {
+                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+                }
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+              }
+              if (redirecting && redirectCount >= maxRedirect) {
+                if (LOG.isInfoEnabled()) {
+        " - redirect count exceeded " + url);
+                }
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+              }
+            } while (redirecting && (redirectCount < maxRedirect));
+          } catch (Throwable t) {                 // unexpected exception
+            logError(url, t.toString());
+            output(url, datum, null, null, CrawlDatum.STATUS_FETCH_RETRY);
+          }
+        }
+      } catch (Throwable e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      } finally {
+        synchronized (OldFetcher.this) {activeThreads--;} // count threads
+      }
+    }
+    private Text handleRedirect(Text url, CrawlDatum datum,
+                                String urlString, String newUrl,
+                                boolean temp, String redirType)
+    throws MalformedURLException, URLFilterException {
+      newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+      newUrl = urlFilters.filter(newUrl);
+      if (newUrl != null && !newUrl.equals(urlString)) {
+        reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+        url = new Text(newUrl);
+        if (maxRedirect > 0) {
+          redirecting = true;
+          redirectCount++;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" - " + redirType + " redirect to " +
+                      url + " (fetching now)");
+          }
+          return url;
+        } else {
+          CrawlDatum newDatum = new CrawlDatum();
+          if (reprUrl != null) {
+            newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+                new Text(reprUrl));
+          }
+          output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" - " + redirType + " redirect to " +
+                      url + " (fetching later)");
+          }
+          return null;
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" - " + redirType + " redirect skipped: " +
+              (newUrl != null ? "to same url" : "filtered"));
+        }
+        return null;
+      }
+    }
+    private void logError(Text url, String message) {
+      if (LOG.isInfoEnabled()) {
+"fetch of " + url + " failed with: " + message);
+      }
+      synchronized (OldFetcher.this) {               // record failure
+        errors++;
+      }
+    }
+    private ParseStatus output(Text key, CrawlDatum datum,
+                        Content content, ProtocolStatus pstatus, int status) {
+      datum.setStatus(status);
+      datum.setFetchTime(System.currentTimeMillis());
+      if (pstatus != null) 
datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+      ParseResult parseResult = null;
+      if (content != null) {
+        Metadata metadata = content.getMetadata();
+        // add segment to metadata
+        metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+        // add score to content metadata so that ParseSegment can pick it up.
+        try {
+          scfilters.passScoreBeforeParsing(key, datum, content);
+        } catch (Exception e) {
+          if (LOG.isWarnEnabled()) {
+            e.printStackTrace(LogUtil.getWarnStream(LOG));
+            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+          }
+        }
+        /* Note: Fetcher will only follow meta-redirects coming from the
+         * original URL. */ 
+        if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+          try {
+            parseResult = this.parseUtil.parse(content);
+          } catch (Exception e) {
+            LOG.warn("Error parsing: " + key + ": " + 
+          }
+          if (parseResult == null) {
+            byte[] signature = 
+              SignatureFactory.getSignature(getConf()).calculate(content, 
+                  new ParseStatus().getEmptyParse(conf));
+            datum.setSignature(signature);
+          }
+        }
+        /* Store status code in content So we can read this value during 
+         * parsing (as a separate job) and decide to parse or not.
+         */
+        content.getMetadata().add(Nutch.FETCH_STATUS_KEY, 
+      }
+      try {
+        output.collect(key, new NutchWritable(datum));
+        if (content != null && storingContent)
+          output.collect(key, new NutchWritable(content));
+        if (parseResult != null) {
+          for (Entry<Text, Parse> entry : parseResult) {
+            Text url = entry.getKey();
+            Parse parse = entry.getValue();
+            ParseStatus parseStatus = parse.getData().getStatus();
+            if (!parseStatus.isSuccess()) {
+              LOG.warn("Error parsing: " + key + ": " + parseStatus);
+              parse = parseStatus.getEmptyParse(getConf());
+            }
+            // Calculate page signature. For non-parsing fetchers this will
+            // be done in ParseSegment
+            byte[] signature = 
+              SignatureFactory.getSignature(getConf()).calculate(content, 
+            // Ensure segment name and score are in parseData metadata
+            parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
+                segmentName);
+            parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
+                StringUtil.toHexString(signature));
+            // Pass fetch time to content meta
+            parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
+                Long.toString(datum.getFetchTime()));
+            if (url.equals(key))
+              datum.setSignature(signature);
+            try {
+              scfilters.passScoreAfterParsing(url, content, parse);
+            } catch (Exception e) {
+              if (LOG.isWarnEnabled()) {
+                e.printStackTrace(LogUtil.getWarnStream(LOG));
+                LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+              }
+            }
+            output.collect(url, new NutchWritable(
+                    new ParseImpl(new ParseText(parse.getText()), 
+                                  parse.getData(), parse.isCanonical())));
+          }
+        }
+      } catch (IOException e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      }
+      // return parse status if it exits
+      if (parseResult != null && !parseResult.isEmpty()) {
+        Parse p = parseResult.get(content.getUrl());
+        if (p != null) {
+          return p.getData().getStatus();
+        }
+      } 
+      return null;
+    }
+  }
+  private synchronized void updateStatus(int bytesInPage) throws IOException {
+    pages++;
+    bytes += bytesInPage;
+  }
+  private void reportStatus() throws IOException {
+    String status;
+    synchronized (this) {
+      long elapsed = (System.currentTimeMillis() - start)/1000;
+      status = 
+        pages+" pages, "+errors+" errors, "
+        + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
+        + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
+    }
+    reporter.setStatus(status);
+  }
+  public OldFetcher() {
+  }
+  public OldFetcher(Configuration conf) {
+    setConf(conf);
+  }
+  public void configure(JobConf job) {
+    setConf(job);
+    this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
+    this.storingContent = isStoringContent(job);
+    this.parsing = isParsing(job);
+//    if (job.getBoolean("fetcher.verbose", false)) {
+//      LOG.setLevel(Level.FINE);
+//    }
+  }
+  public void close() {}
+  public static boolean isParsing(Configuration conf) {
+    return conf.getBoolean("fetcher.parse", true);
+  }
+  public static boolean isStoringContent(Configuration conf) {
+    return conf.getBoolean("", true);
+  }
+  public void run(RecordReader<WritableComparable, Writable> input, 
OutputCollector<Text, NutchWritable> output,
+                  Reporter reporter) throws IOException {
+    this.input = input;
+    this.output = output;
+    this.reporter = reporter;
+    this.maxRedirect = getConf().getInt("http.redirect.max", 3);
+    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+    if (LOG.isInfoEnabled()) {"OldFetcher: threads: " + 
threadCount); }
+    for (int i = 0; i < threadCount; i++) {       // spawn threads
+      new FetcherThread(getConf()).start();
+    }
+    // select a timeout that avoids a task timeout
+    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+    do {                                          // wait for threads to exit
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      reportStatus();
+      // some requests seem to hang, despite all intentions
+      synchronized (this) {
+        if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Aborting with "+activeThreads+" hung threads.");
+          }
+          return;
+        }
+      }
+    } while (activeThreads > 0);
+  }
+  public void fetch(Path segment, int threads)
+    throws IOException {
+    if (LOG.isInfoEnabled()) {
+"OldFetcher: starting");
+"OldFetcher: segment: " + segment);
+    }
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("fetch " + segment);
+    job.setInt("fetcher.threads.fetch", threads);
+    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
+    // for politeness, don't permit parallel execution of a single task
+    job.setSpeculativeExecution(false);
+    FileInputFormat.addInputPath(job, new Path(segment, 
+    job.setInputFormat(InputFormat.class);
+    job.setMapRunnerClass(OldFetcher.class);
+    FileOutputFormat.setOutputPath(job, segment);
+    job.setOutputFormat(FetcherOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NutchWritable.class);
+    JobClient.runJob(job);
+    if (LOG.isInfoEnabled()) {"OldFetcher: done"); }
+  }
+  /** Run the fetcher. */
+  public static void main(String[] args) throws Exception {
+    int res =, new OldFetcher(), 
+    System.exit(res);
+  }
+  public int run(String[] args) throws Exception {
+    String usage = "Usage: OldFetcher <segment> [-threads n] [-noParsing]";
+    if (args.length < 1) {
+      System.err.println(usage);
+      return -1;
+    }
+    Path segment = new Path(args[0]);
+    int threads = getConf().getInt("fetcher.threads.fetch", 10);
+    boolean parsing = true;
+    for (int i = 1; i < args.length; i++) {       // parse command line
+      if (args[i].equals("-threads")) {           // found -threads option
+        threads =  Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-noParsing")) parsing = false;
+    }
+    getConf().setInt("fetcher.threads.fetch", threads);
+    if (!parsing) {
+      getConf().setBoolean("fetcher.parse", parsing);
+    }
+    try {
+      fetch(segment, threads);              // run the Fetcher
+      return 0;
+    } catch (Exception e) {
+      LOG.fatal("OldFetcher: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }

Reply via email to