This is an automated email from the ASF dual-hosted git repository. markus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 0cdd095 NUTCH-2445 Fetcher following outlinks to keep track of already fetched items 0cdd095 is described below commit 0cdd095c881eed52dc461e559ce6ae278e99157f Author: Markus Jelsma <mar...@apache.org> AuthorDate: Mon Oct 23 15:59:13 2017 +0200 NUTCH-2445 Fetcher following outlinks to keep track of already fetched items --- .../org/apache/nutch/fetcher/FetchItemQueue.java | 6 ++++ .../org/apache/nutch/fetcher/FetcherThread.java | 41 ++++++++++++++-------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java index b67be74..5096b37 100644 --- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java +++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java @@ -22,6 +22,8 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -51,6 +53,10 @@ public class FetchItemQueue { Text cookie; Text variableFetchDelayKey = new Text("_variableFetchDelay_"); boolean variableFetchDelaySet = false; + // keep track of duplicates if fetcher.follow.outlinks.depth > 0. Some urls may + // not get followed due to hash collisions. Hashing is used to reduce memory + // usage. + Set<Integer> alreadyFetched = new HashSet<>(); public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) { diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java index 77947b6..42d5d50 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherThread.java +++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java @@ -198,7 +198,7 @@ public class FetcherThread extends Thread { + " - forcing to byHost"); queueMode = FetchItemQueues.QUEUE_MODE_HOST; } - LOG.info("Using queue mode : " + queueMode); + LOG.info(getName() + " " + Thread.currentThread().getId() + " Using queue mode : " + queueMode); this.maxRedirect = conf.getInt("http.redirect.max", 3); maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100); @@ -219,7 +219,7 @@ public class FetcherThread extends Thread { if (storingContent) { robotsTxtContent = new LinkedList<>(); } else { - LOG.warn("Ignoring fetcher.store.robotstxt because not storing content (fetcher.store.content)!"); + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Ignoring fetcher.store.robotstxt because not storing content (fetcher.store.content)!"); } } } @@ -262,7 +262,7 @@ public class FetcherThread extends Thread { continue; } else { // all done, finish this thread - LOG.info("Thread " + getName() + " has no more work available"); + LOG.info(getName() + " " + Thread.currentThread().getId() + " has no more work available"); return; } } @@ -287,7 +287,7 @@ public class FetcherThread extends Thread { do { if (LOG.isInfoEnabled()) { - LOG.info("fetching " + fit.url + " (queue crawl delay=" + LOG.info(getName() + " " + Thread.currentThread().getId() + " fetching " + fit.url + " (queue crawl delay=" + ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); } @@ -438,7 +438,7 @@ public class FetcherThread extends Thread { default: if (LOG.isWarnEnabled()) { - LOG.warn("Unknown ProtocolStatus: " + status.getCode()); + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Unknown ProtocolStatus: " + status.getCode()); } output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); @@ -447,7 +447,7 @@ public class FetcherThread extends Thread { if (redirecting && redirectCount > maxRedirect) { ((FetchItemQueues) fetchQueues).finishFetchItem(fit); if (LOG.isInfoEnabled()) { - LOG.info(" - redirect count exceeded " + fit.url); + LOG.info(getName() + " " + Thread.currentThread().getId() + " - redirect count exceeded " + fit.url); } output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, @@ -473,7 +473,7 @@ public class FetcherThread extends Thread { if (fit != null) ((FetchItemQueues) fetchQueues).finishFetchItem(fit); activeThreads.decrementAndGet(); // count threads - LOG.info("-finishing thread " + getName() + ", activeThreads=" + LOG.info(getName() + " " + Thread.currentThread().getId() + " -finishing thread " + getName() + ", activeThreads=" + activeThreads); } } @@ -577,7 +577,7 @@ public class FetcherThread extends Thread { private void logError(Text url, String message) { if (LOG.isInfoEnabled()) { - LOG.info("fetch of " + url + " failed with: " + message); + LOG.info(getName() + " " + Thread.currentThread().getId() + " fetch of " + url + " failed with: " + message); } errors.incrementAndGet(); } @@ -612,7 +612,7 @@ public class FetcherThread extends Thread { scfilters.passScoreBeforeParsing(key, datum, content); } catch (Exception e) { if (LOG.isWarnEnabled()) { - LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Couldn't pass score, url " + key + " (" + e + ")"); } } /* @@ -625,7 +625,7 @@ public class FetcherThread extends Thread { try { parseResult = this.parseUtil.parse(content); } catch (Exception e) { - LOG.warn("Error parsing: " + key + ": " + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Error parsing: " + key + ": " + StringUtils.stringifyException(e)); } } @@ -657,7 +657,7 @@ public class FetcherThread extends Thread { ParseData parseData = parse.getData(); if (!parseStatus.isSuccess()) { - LOG.warn("Error parsing: " + key + ": " + parseStatus); + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Error parsing: " + key + ": " + parseStatus); parse = parseStatus.getEmptyParse(conf); } @@ -678,7 +678,7 @@ public class FetcherThread extends Thread { scfilters.passScoreAfterParsing(url, content, parse); } catch (Exception e) { if (LOG.isWarnEnabled()) { - LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + LOG.warn(getName() + " " + Thread.currentThread().getId() + " Couldn't pass score, url " + key + " (" + e + ")"); } } @@ -740,6 +740,10 @@ public class FetcherThread extends Thread { } // Only process depth N outlinks if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { + FetchItem ft = FetchItem.create(url, null, queueMode); + FetchItemQueue queue = ((FetchItemQueues) fetchQueues).getFetchItemQueue(ft.queueID); + queue.alreadyFetched.add(url.toString().hashCode()); + reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size()); @@ -766,13 +770,20 @@ public class FetcherThread extends Thread { } } - reporter - .incrCounter("FetcherOutlinks", "outlinks_following", 1); - + // Already followed? + int urlHashCode = followUrl.hashCode(); + if (queue.alreadyFetched.contains(urlHashCode)) { + continue; + } + queue.alreadyFetched.add(urlHashCode); + // Create new FetchItem with depth incremented FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1); + + reporter + .incrCounter("FetcherOutlinks", "outlinks_following", 1); ((FetchItemQueues) fetchQueues).addFetchItem(fit); outlinkCounter++; -- To stop receiving notification emails like this one, please contact ['"commits@nutch.apache.org" <commits@nutch.apache.org>'].