Author: ab
Date: Mon May  8 14:52:09 2006
New Revision: 405181

URL: http://svn.apache.org/viewcvs?rev=405181&view=rev
Log:
Refactor to make it easier to use these classes programmatically.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?rev=405181&r1=405180&r2=405181&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Mon 
May  8 14:52:09 2006
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -55,9 +56,28 @@
  * @author Andrzej Bialecki
  * 
  */
-public class CrawlDbReader {
+public class CrawlDbReader implements Closeable {
 
   public static final Logger LOG = 
LogFormatter.getLogger(CrawlDbReader.class.getName());
+  
+  private MapFile.Reader[] readers = null;
+  
+  private void openReaders(String crawlDb, Configuration config) throws 
IOException {
+    if (readers != null) return;
+    FileSystem fs = FileSystem.get(config);
+    readers = MapFileOutputFormat.getReaders(fs, new File(crawlDb, 
CrawlDatum.DB_DIR_NAME), config);
+  }
+  
+  private void closeReaders() {
+    if (readers == null) return;
+    for (int i = 0; i < readers.length; i++) {
+      try {
+        readers[i].close();
+      } catch (Exception e) {
+        
+      }
+    }
+  }
 
   public static class CrawlDbStatMapper implements Mapper {
     public void configure(JobConf job) {}
@@ -177,6 +197,10 @@
     
     public void close() {}
   }
+
+  public void close() {
+    closeReaders();
+  }
   
   public void processStatJob(String crawlDb, Configuration config) throws 
IOException {
     LOG.info("CrawlDb statistics start: " + crawlDb);
@@ -249,16 +273,20 @@
     LOG.info("CrawlDb statistics: done");
 
   }
-
-  public void readUrl(String crawlDb, String url, Configuration config) throws 
IOException {
-    FileSystem fs = FileSystem.get(config);
+  
+  public CrawlDatum get(String crawlDb, String url, Configuration config) 
throws IOException {
     UTF8 key = new UTF8(url);
     CrawlDatum val = new CrawlDatum();
-    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new 
File(crawlDb, CrawlDatum.DB_DIR_NAME), config);
-    Writable res = MapFileOutputFormat.getEntry(readers, new 
HashPartitioner(), key, val);
+    openReaders(crawlDb, config);
+    CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new 
HashPartitioner(), key, val);
+    return res;
+  }
+
+  public void readUrl(String crawlDb, String url, Configuration config) throws 
IOException {
+    CrawlDatum res = get(crawlDb, url, config);
     System.out.println("URL: " + url);
     if (res != null) {
-      System.out.println(val);
+      System.out.println(res);
     } else {
       System.out.println("not found");
     }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?rev=405181&r1=405180&r2=405181&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Mon May  8 
14:52:09 2006
@@ -28,6 +28,7 @@
 import org.apache.hadoop.util.LogFormatter;
 import org.apache.hadoop.mapred.*;
 
+import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.parse.*;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
@@ -44,15 +45,27 @@
   private int maxInlinks;
   private boolean ignoreInternalLinks;
   
-  public static class LinkDbMerger extends MapReduceBase implements Reducer {
+  public static class Merger extends MapReduceBase implements Reducer {
     private int _maxInlinks;
+    private URLFilters filters = null;
     
     public void configure(JobConf job) {
       super.configure(job);
       _maxInlinks = job.getInt("db.max.inlinks", 10000);
+      if (job.getBoolean("linkdb.merger.urlfilters", false)) {
+        filters = new URLFilters(job);
+      }
     }
 
     public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter) throws IOException {
+      if (filters != null) {
+        try {
+          if (filters.filter(((UTF8)key).toString()) == null)
+            return;
+        } catch (Exception e) {
+          LOG.fine("Can't filter " + key + ": " + e);
+        }
+      }
       Inlinks inlinks = null;
       while (values.hasNext()) {
         if (inlinks == null) {
@@ -65,9 +78,19 @@
             output.collect(key, inlinks);
             return;
           }
-          inlinks.add((Inlink)it.next());
+          Inlink in = (Inlink)it.next();
+          if (filters != null) {
+            try {
+              if (filters.filter(in.getFromUrl()) == null)
+                continue;
+            } catch (Exception e) {
+              LOG.fine("Can't filter " + key + ": " + e);
+            }
+          }
+          inlinks.add(in);
         }
       }
+      if (inlinks.size() == 0) return;
       output.collect(key, inlinks);
     }
   }
@@ -205,7 +228,6 @@
     job.setInputValueClass(ParseData.class);
 
     job.setMapperClass(LinkDb.class);
-    //job.setCombinerClass(LinkDb.class);
     job.setReducerClass(LinkDb.class);
 
     job.setOutputDir(newLinkDb);
@@ -217,7 +239,7 @@
     return job;
   }
 
-  private static JobConf createMergeJob(Configuration config, File linkDb) {
+  public static JobConf createMergeJob(Configuration config, File linkDb) {
     File newLinkDb =
       new File("linkdb-merge-" + 
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
@@ -229,7 +251,7 @@
     job.setInputKeyClass(UTF8.class);
     job.setInputValueClass(Inlinks.class);
 
-    job.setReducerClass(LinkDbMerger.class);
+    job.setReducerClass(Merger.class);
 
     job.setOutputDir(newLinkDb);
     job.setOutputFormat(MapFileOutputFormat.class);


Reply via email to