Author: ab Date: Tue Dec 1 14:50:15 2009 New Revision: 885776 URL: http://svn.apache.org/viewvc?rev=885776&view=rev Log: NUTCH-770 Timebomb for Fetcher.
Modified: lucene/nutch/trunk/conf/nutch-default.xml lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Modified: lucene/nutch/trunk/conf/nutch-default.xml URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=885776&r1=885775&r2=885776&view=diff ============================================================================== --- lucene/nutch/trunk/conf/nutch-default.xml (original) +++ lucene/nutch/trunk/conf/nutch-default.xml Tue Dec 1 14:50:15 2009 @@ -601,6 +601,15 @@ <description>If true, fetcher will store content.</description> </property> +<property> + <name>fetcher.timelimit.mins</name> + <value>-1</value> + <description>This is the number of minutes allocated to the fetching. + Once this value is reached, any remaining entry from the input URL list is skipped + and all active queues are emptied. The default value of -1 deactivates the time limit. + </description> +</property> + <!-- indexer properties --> <property> @@ -1277,4 +1286,14 @@ </description> </property> +<!-- solr index properties --> +<property> + <name>solrindex.mapping.file</name> + <value>solrindex-mapping.xml</value> + <description> + Defines the name of the file that will be used in the mapping of internal + nutch field names to solr index fields as specified in the target Solr schema. + </description> +</property> + </configuration> Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=885776&r1=885775&r2=885776&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Tue Dec 1 14:50:15 2009 @@ -222,6 +222,12 @@ setEndTime(System.currentTimeMillis() - crawlDelay); } + public synchronized int emptyQueue() { + int presize = queue.size(); + queue.clear(); + return presize; + } + public int getQueueSize() { return queue.size(); } @@ -299,6 +305,7 @@ boolean byIP; long crawlDelay; long minCrawlDelay; + long timelimit = -1; Configuration conf; public FetchItemQueues(Configuration conf) { @@ -308,6 +315,7 @@ this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false); this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000); + this.timelimit = conf.getLong("fetcher.timelimit.mins", -1); } public int getTotalSize() { @@ -371,6 +379,29 @@ return null; } + // called only once the feeder has stopped + public synchronized int checkTimelimit() { + int count = 0; + if (System.currentTimeMillis() >= timelimit && timelimit != -1) { + // emptying the queues + for (String id : queues.keySet()) { + FetchItemQueue fiq = queues.get(id); + if (fiq.getQueueSize() == 0) continue; + LOG.info("* queue: " + id + " >> timelimit! "); + int deleted = fiq.emptyQueue(); + for (int i = 0; i < deleted; i++) { + totalSize.decrementAndGet(); + } + count += deleted; + } + // there might also be a case where totalsize !=0 but number of queues + // == 0 + // in which case we simply force it to 0 to avoid blocking + if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0); + } + return count; + } + public synchronized void dump() { for (String id : queues.keySet()) { FetchItemQueue fiq = queues.get(id); @@ -389,6 +420,7 @@ private RecordReader<Text, CrawlDatum> reader; private FetchItemQueues queues; private int size; + private long timelimit = -1; public QueueFeeder(RecordReader<Text, CrawlDatum> reader, FetchItemQueues queues, int size) { @@ -399,11 +431,29 @@ this.setName("QueueFeeder"); } + public void setTimeLimit(long tl) { + timelimit = tl; + } + public void run() { boolean hasMore = true; int cnt = 0; - + int timelimitcount = 0; while (hasMore) { + if (System.currentTimeMillis() >= timelimit && timelimit != -1) { + // enough .. lets' simply + // read all the entries from the input without processing them + try { + Text url = new Text(); + CrawlDatum datum = new CrawlDatum(); + hasMore = reader.next(url, datum); + timelimitcount++; + } catch (IOException e) { + LOG.fatal("QueueFeeder error reading input, record " + cnt, e); + return; + } + continue; + } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space @@ -430,7 +480,8 @@ } } } - LOG.info("QueueFeeder finished: total " + cnt + " records."); + LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :" + + timelimitcount); } } @@ -899,6 +950,10 @@ feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); + + // the value of the time limit is either -1 or the time where it should finish + long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); + if (timelimit != -1) feeder.setTimeLimit(timelimit); feeder.start(); // set non-blocking & no-robots mode for HTTP protocol plugins. @@ -924,6 +979,14 @@ if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { fetchQueues.dump(); } + + // check timelimit + if (!feeder.isAlive()) { + int hitByTimeLimit = fetchQueues.checkTimelimit(); + if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus", + "hitByTimeLimit", hitByTimeLimit); + } + // some requests seem to hang, despite all intentions if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { if (LOG.isWarnEnabled()) { @@ -947,6 +1010,16 @@ LOG.info("Fetcher: segment: " + segment); } + // set the actual time for the timelimit relative + // to the beginning of the whole job and not of a specific task + // otherwise it keeps trying again if a task fails + long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); + if (timelimit != -1) { + timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); + LOG.info("Fetcher Timelimit set for : " + timelimit); + getConf().setLong("fetcher.timelimit.mins", timelimit); + } + JobConf job = new NutchJob(getConf()); job.setJobName("fetch " + segment);