Author: cutting Date: Thu Sep 15 10:15:16 2005 New Revision: 289282 URL: http://svn.apache.org/viewcvs?rev=289282&view=rev Log: Finish even when some threads hung. Improve status reports.
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=289282&r1=289281&r2=289282&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Sep 15 10:15:16 2005 @@ -59,6 +59,7 @@ private String segmentName; private int activeThreads; private int maxRedirect; + private boolean done; private long start = System.currentTimeMillis(); // start time of fetcher run @@ -70,6 +71,10 @@ private boolean parsing; private class FetcherThread extends Thread { + public FetcherThread() { + this.setDaemon(true); // don't hang JVM on exit + } + public void run() { synchronized (Fetcher.this) {activeThreads++;} // count threads @@ -82,8 +87,10 @@ break; // exit try { // get next entry from input - if (!input.next(key, datum)) + if (!input.next(key, datum)) { + done = true; break; // at eof, exit + } } catch (IOException e) { LOG.severe("fetcher caught:"+e.toString()); break; @@ -125,8 +132,9 @@ } break; - case ProtocolStatus.RETRY: // retry case ProtocolStatus.EXCEPTION: + logError(url, status.getMessage()); + case ProtocolStatus.RETRY: // retry output(key, datum, null, CrawlDatum.STATUS_FETCH_RETRY); break; @@ -152,7 +160,7 @@ } catch (Throwable t) { // unexpected exception - logError(url, t); + logError(url, t.toString()); output(key, datum, null, CrawlDatum.STATUS_FETCH_GONE); } @@ -165,9 +173,8 @@ } } - private void logError(String url, Throwable t) { - LOG.info("fetch of " + url + " failed with: " + t); - LOG.log(Level.FINE, "stack", t); // stack trace + private void logError(String url, String message) { + LOG.info("fetch of " + url + " failed with: " + message); synchronized (Fetcher.this) { // record failure errors++; } @@ -225,19 +232,14 @@ private synchronized void updateStatus(int bytesInPage) throws IOException { pages++; bytes += bytesInPage; + } - if ((pages % 100) == 0) { // show status every 100pp - long elapsed = (System.currentTimeMillis() - start)/1000; - String line1 = - pages+" pages, "+errors+" errors, "+bytes+" bytes, "+elapsed+" secs"; - String line2 = - + ((float)pages)/elapsed+" pages/s, " - + ((((float)bytes)*8)/1024)/elapsed+" kb/s, " - + ((float)bytes)/pages+" bytes/page"; - LOG.info( "status: "+line1); - LOG.info( "status: "+line2); - reporter.setStatus(line2); - } + private synchronized void reportStatus() throws IOException { + long elapsed = (System.currentTimeMillis() - start)/1000; + reporter.setStatus + (pages+" pages, "+errors+" errors, " + + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, " + + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, "); } public void configure(JobConf job) { @@ -266,7 +268,7 @@ this.input = input; this.output = output; this.reporter = reporter; - + this.maxRedirect = getConf().getInt("http.redirect.max", 3); int threadCount = getConf().getInt("fetcher.threads.fetch", 10); @@ -278,6 +280,24 @@ try { Thread.sleep(1000); } catch (InterruptedException e) {} + + reportStatus(); + + // some threads seem to hang, despite all intentions + if (done) { // last entry read + long doneTime = System.currentTimeMillis(); + long timeout = getConf().getLong("http.timeout", 10000) * 10; + while (activeThreads > 0 + && System.currentTimeMillis()-doneTime < timeout) { + try { + Thread.sleep(1000); // wait for completion + } catch (InterruptedException e) {} + } + if (activeThreads > 0) { // abort after timeout + LOG.warning("Aborting with "+activeThreads+" hung threads."); + return; + } + } } while (activeThreads > 0);