This is an automated email from the ASF dual-hosted git repository.

markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cdd095  NUTCH-2445 Fetcher following outlinks to keep track of 
already fetched items
0cdd095 is described below

commit 0cdd095c881eed52dc461e559ce6ae278e99157f
Author: Markus Jelsma <mar...@apache.org>
AuthorDate: Mon Oct 23 15:59:13 2017 +0200

    NUTCH-2445 Fetcher following outlinks to keep track of already fetched items
---
 .../org/apache/nutch/fetcher/FetchItemQueue.java   |  6 ++++
 .../org/apache/nutch/fetcher/FetcherThread.java    | 41 ++++++++++++++--------
 2 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java 
b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
index b67be74..5096b37 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
@@ -51,6 +53,10 @@ public class FetchItemQueue {
   Text cookie;
   Text variableFetchDelayKey = new Text("_variableFetchDelay_");
   boolean variableFetchDelaySet = false;
+  // keep track of duplicates if fetcher.follow.outlinks.depth > 0. Some urls 
may 
+  // not get followed due to hash collisions. Hashing is used to reduce memory
+  // usage.
+  Set<Integer> alreadyFetched = new HashSet<>();
   
   public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
       long minCrawlDelay) {
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java 
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 77947b6..42d5d50 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -198,7 +198,7 @@ public class FetcherThread extends Thread {
           + " - forcing to byHost");
       queueMode = FetchItemQueues.QUEUE_MODE_HOST;
     }
-    LOG.info("Using queue mode : " + queueMode);
+    LOG.info(getName() + " " + Thread.currentThread().getId() + " Using queue 
mode : " + queueMode);
     this.maxRedirect = conf.getInt("http.redirect.max", 3);
 
     maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
@@ -219,7 +219,7 @@ public class FetcherThread extends Thread {
       if (storingContent) {
         robotsTxtContent = new LinkedList<>();
       } else {
-        LOG.warn("Ignoring fetcher.store.robotstxt because not storing content 
(fetcher.store.content)!");
+        LOG.warn(getName() + " " + Thread.currentThread().getId() + " Ignoring 
fetcher.store.robotstxt because not storing content (fetcher.store.content)!");
       }
     }
   }
@@ -262,7 +262,7 @@ public class FetcherThread extends Thread {
             continue;
           } else {
             // all done, finish this thread
-            LOG.info("Thread " + getName() + " has no more work available");
+            LOG.info(getName() + " " + Thread.currentThread().getId() + " has 
no more work available");
             return;
           }
         }
@@ -287,7 +287,7 @@ public class FetcherThread extends Thread {
           
           do {
             if (LOG.isInfoEnabled()) {
-              LOG.info("fetching " + fit.url + " (queue crawl delay="
+              LOG.info(getName() + " " + Thread.currentThread().getId() + " 
fetching " + fit.url + " (queue crawl delay="
                   + ((FetchItemQueues) 
fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
                   + "ms)");
             }
@@ -438,7 +438,7 @@ public class FetcherThread extends Thread {
 
             default:
               if (LOG.isWarnEnabled()) {
-                LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+                LOG.warn(getName() + " " + Thread.currentThread().getId() + " 
Unknown ProtocolStatus: " + status.getCode());
               }
               output(fit.url, fit.datum, null, status,
                   CrawlDatum.STATUS_FETCH_RETRY);
@@ -447,7 +447,7 @@ public class FetcherThread extends Thread {
             if (redirecting && redirectCount > maxRedirect) {
               ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
               if (LOG.isInfoEnabled()) {
-                LOG.info(" - redirect count exceeded " + fit.url);
+                LOG.info(getName() + " " + Thread.currentThread().getId() + "  
- redirect count exceeded " + fit.url);
               }
               output(fit.url, fit.datum, null,
                   ProtocolStatus.STATUS_REDIR_EXCEEDED,
@@ -473,7 +473,7 @@ public class FetcherThread extends Thread {
       if (fit != null)
         ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
       activeThreads.decrementAndGet(); // count threads
-      LOG.info("-finishing thread " + getName() + ", activeThreads="
+      LOG.info(getName() + " " + Thread.currentThread().getId() + " -finishing 
thread " + getName() + ", activeThreads="
           + activeThreads);
     }
   }
@@ -577,7 +577,7 @@ public class FetcherThread extends Thread {
 
   private void logError(Text url, String message) {
     if (LOG.isInfoEnabled()) {
-      LOG.info("fetch of " + url + " failed with: " + message);
+      LOG.info(getName() + " " + Thread.currentThread().getId() + " fetch of " 
+ url + " failed with: " + message);
     }
     errors.incrementAndGet();
   }
@@ -612,7 +612,7 @@ public class FetcherThread extends Thread {
         scfilters.passScoreBeforeParsing(key, datum, content);
       } catch (Exception e) {
         if (LOG.isWarnEnabled()) {
-          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+          LOG.warn(getName() + " " + Thread.currentThread().getId() + " 
Couldn't pass score, url " + key + " (" + e + ")");
         }
       }
       /*
@@ -625,7 +625,7 @@ public class FetcherThread extends Thread {
           try {
             parseResult = this.parseUtil.parse(content);
           } catch (Exception e) {
-            LOG.warn("Error parsing: " + key + ": "
+            LOG.warn(getName() + " " + Thread.currentThread().getId() + " 
Error parsing: " + key + ": "
                 + StringUtils.stringifyException(e));
           }
         }
@@ -657,7 +657,7 @@ public class FetcherThread extends Thread {
           ParseData parseData = parse.getData();
 
           if (!parseStatus.isSuccess()) {
-            LOG.warn("Error parsing: " + key + ": " + parseStatus);
+            LOG.warn(getName() + " " + Thread.currentThread().getId() + " 
Error parsing: " + key + ": " + parseStatus);
             parse = parseStatus.getEmptyParse(conf);
           }
 
@@ -678,7 +678,7 @@ public class FetcherThread extends Thread {
             scfilters.passScoreAfterParsing(url, content, parse);
           } catch (Exception e) {
             if (LOG.isWarnEnabled()) {
-              LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+              LOG.warn(getName() + " " + Thread.currentThread().getId() + " 
Couldn't pass score, url " + key + " (" + e + ")");
             }
           }
 
@@ -740,6 +740,10 @@ public class FetcherThread extends Thread {
           }
           // Only process depth N outlinks
           if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+            FetchItem ft = FetchItem.create(url, null, queueMode);
+            FetchItemQueue queue = ((FetchItemQueues) 
fetchQueues).getFetchItemQueue(ft.queueID);
+            queue.alreadyFetched.add(url.toString().hashCode());
+
             reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
                 outlinks.size());
 
@@ -766,13 +770,20 @@ public class FetcherThread extends Thread {
                 }
               }
 
-              reporter
-                  .incrCounter("FetcherOutlinks", "outlinks_following", 1);
-
+              // Already followed?
+              int urlHashCode = followUrl.hashCode();
+              if (queue.alreadyFetched.contains(urlHashCode)) {
+                continue;
+              }
+              queue.alreadyFetched.add(urlHashCode);
+              
               // Create new FetchItem with depth incremented
               FetchItem fit = FetchItem.create(new Text(followUrl),
                   new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
                   queueMode, outlinkDepth + 1);
+                  
+              reporter
+                  .incrCounter("FetcherOutlinks", "outlinks_following", 1);
               ((FetchItemQueues) fetchQueues).addFetchItem(fit);
 
               outlinkCounter++;

-- 
To stop receiving notification emails like this one, please contact
['"commits@nutch.apache.org" <commits@nutch.apache.org>'].

Reply via email to