This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 5907604  NUTCH-2518 Cleaning up the file system after a job failure.
     new 8682b96  Merge pull request #307 from Omkar20895/NUTCH-2518
5907604 is described below

commit 5907604b341f3eda1aae924606fce9022446132c
Author: Omkar20895 <omkarreddy2...@gmail.com>
AuthorDate: Mon Apr 2 16:38:43 2018 +0530

    NUTCH-2518 Cleaning up the file system after a job failure.
---
 src/java/org/apache/nutch/crawl/CrawlDb.java       | 20 +++++---
 src/java/org/apache/nutch/crawl/CrawlDbMerger.java | 22 ++++++---
 src/java/org/apache/nutch/crawl/CrawlDbReader.java | 57 +++++++++++++++++-----
 .../org/apache/nutch/crawl/DeduplicationJob.java   | 28 +++++++++--
 src/java/org/apache/nutch/crawl/Generator.java     | 42 ++++++++++++----
 src/java/org/apache/nutch/crawl/Injector.java      | 18 ++-----
 src/java/org/apache/nutch/crawl/LinkDb.java        | 29 ++++++++---
 src/java/org/apache/nutch/crawl/LinkDbMerger.java  | 15 +++++-
 src/java/org/apache/nutch/crawl/LinkDbReader.java  | 11 ++++-
 src/java/org/apache/nutch/fetcher/Fetcher.java     |  9 +++-
 src/java/org/apache/nutch/hostdb/ReadHostDb.java   |  2 +-
 src/java/org/apache/nutch/hostdb/UpdateHostDb.java | 16 ++++--
 src/java/org/apache/nutch/indexer/CleaningJob.java |  9 +++-
 src/java/org/apache/nutch/indexer/IndexingJob.java | 11 ++++-
 src/java/org/apache/nutch/parse/ParseSegment.java  | 11 ++++-
 .../org/apache/nutch/segment/SegmentMerger.java    | 14 +++++-
 .../org/apache/nutch/segment/SegmentReader.java    |  9 +++-
 src/java/org/apache/nutch/tools/FreeGenerator.java | 11 ++++-
 .../apache/nutch/tools/arc/ArcSegmentCreator.java  |  9 +++-
 .../org/apache/nutch/tools/warc/WARCExporter.java  | 13 +++--
 src/java/org/apache/nutch/util/NutchJob.java       | 17 +++++++
 .../org/apache/nutch/util/SitemapProcessor.java    | 17 ++-----
 22 files changed, 292 insertions(+), 98 deletions(-)

diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java 
b/src/java/org/apache/nutch/crawl/CrawlDb.java
index a545509..05fc3c6 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -129,17 +129,23 @@ public class CrawlDb extends NutchTool implements Tool {
       LOG.info("CrawlDb update: Merging segment data into db.");
     }
 
+    FileSystem fs = crawlDb.getFileSystem(getConf());
+    Path outPath = FileOutputFormat.getOutputPath(job);
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Crawl job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(outPath, lock, fs);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      FileSystem fs = crawlDb.getFileSystem(getConf());
-      LockUtil.removeLockFile(fs, lock);
-      Path outPath = FileOutputFormat.getOutputPath(job);
-      if (fs.exists(outPath))
-        fs.delete(outPath, true);
+      LOG.error("Crawl job failed {}", e);
+      NutchJob.cleanupAfterFailure(outPath, lock, fs);
       throw e;
     }
-    
 
     CrawlDb.install(job, crawlDb);
 
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java 
b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
index 35eca60..d8756fd 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
@@ -144,15 +144,23 @@ public class CrawlDbMerger extends Configured implements 
Tool {
       }
       FileInputFormat.addInputPath(job, new Path(dbs[i], 
CrawlDb.CURRENT_NAME));
     }
+
+    Path outPath = FileOutputFormat.getOutputPath(job);
+    FileSystem fs = outPath.getFileSystem(getConf());
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "CrawlDbMerger job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(outPath, lock, fs);
+        throw new RuntimeException(message);
+      }
       CrawlDb.install(job, output);
-    } catch (IOException e) {
-      LockUtil.removeLockFile(getConf(), lock);
-      Path outPath = FileOutputFormat.getOutputPath(job);
-      FileSystem fs = outPath.getFileSystem(getConf());
-      if (fs.exists(outPath))
-        fs.delete(outPath, true);
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      LOG.error("CrawlDbMerge job failed {}", e);
+      NutchJob.cleanupAfterFailure(outPath, lock, fs);
       throw e;
     }
     long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index c1a79e9..dcf5ace 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -401,15 +401,23 @@ public class CrawlDbReader extends AbstractChecker 
implements Closeable {
 
          // https://issues.apache.org/jira/browse/NUTCH-1029
          config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
false);
-
+          FileSystem fileSystem = tmpFolder.getFileSystem(config);
           try {
-            int complete = job.waitForCompletion(true)?0:1;
-          } catch (InterruptedException | ClassNotFoundException e) {
+            boolean success = job.waitForCompletion(true);
+            if (!success) {
+              String message = "CrawlDbReader job did not succeed, job status:"
+                  + job.getStatus().getState() + ", reason: "
+                  + job.getStatus().getFailureInfo();
+              LOG.error(message);
+              fileSystem.delete(tmpFolder, true);
+              throw new RuntimeException(message);
+            }
+          } catch (IOException | InterruptedException | ClassNotFoundException 
e) {
             LOG.error(StringUtils.stringifyException(e));
+            fileSystem.delete(tmpFolder, true);
             throw e;
           }
          // reading the result
-         FileSystem fileSystem = tmpFolder.getFileSystem(config);
           SequenceFile.Reader[] readers = 
SegmentReaderUtil.getReaders(tmpFolder, config);
 
          Text key = new Text();
@@ -684,8 +692,15 @@ public class CrawlDbReader extends AbstractChecker 
implements Closeable {
     job.setOutputValueClass(CrawlDatum.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "CrawlDbReader job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
       throw e;
     }
@@ -788,11 +803,21 @@ public class CrawlDbReader extends AbstractChecker 
implements Closeable {
     job.setOutputValueClass(Text.class);
 
     job.getConfiguration().setFloat("db.reader.topn.min", min);
-    
+   
+    FileSystem fs = tempDir.getFileSystem(config); 
     try{
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "CrawlDbReader job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        fs.delete(tempDir, true);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
+      fs.delete(tempDir, true);
       throw e;
     }
 
@@ -816,13 +841,21 @@ public class CrawlDbReader extends AbstractChecker 
implements Closeable {
     job.setNumReduceTasks(1); // create a single file.
 
     try{
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "CrawlDbReader job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        fs.delete(tempDir, true);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
+      fs.delete(tempDir, true);
       throw e;
     }
 
-    FileSystem fs = tempDir.getFileSystem(config);
     fs.delete(tempDir, true);
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: done");
diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java 
b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
index f2283ee..555f9e2 100644
--- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
+++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -312,8 +312,17 @@ public class DeduplicationJob extends NutchTool implements 
Tool {
     job.setMapperClass(DBFilter.class);
     job.setReducerClass(DedupReducer.class);
 
+    FileSystem fs = tempDir.getFileSystem(getConf());
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Crawl job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        fs.delete(tempDir, true);
+        throw new RuntimeException(message);
+      }
       CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus");
       if (g != null) {
         Counter counter = g.findCounter("Documents marked as duplicate");
@@ -321,8 +330,9 @@ public class DeduplicationJob extends NutchTool implements 
Tool {
         LOG.info("Deduplication: " + (int) dups
             + " documents marked as duplicates");
       }
-    } catch (final Exception e) {
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
+      fs.delete(tempDir, true);
       return -1;
     }
 
@@ -337,16 +347,24 @@ public class DeduplicationJob extends NutchTool 
implements Tool {
     mergeJob.setReducerClass(StatusUpdateReducer.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (final Exception e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Crawl job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        fs.delete(tempDir, true);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
+      fs.delete(tempDir, true);
       return -1;
     }
 
     CrawlDb.install(mergeJob, dbPath);
 
     // clean up
-    FileSystem fs = tempDir.getFileSystem(getConf());
     fs.delete(tempDir, true);
 
     long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/crawl/Generator.java 
b/src/java/org/apache/nutch/crawl/Generator.java
index c972a13..c9096d6 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -781,10 +781,18 @@ public class Generator extends NutchTool implements Tool {
     MultipleOutputs.addNamedOutput(job, "sequenceFiles", 
SequenceFileOutputFormat.class, FloatWritable.class, SelectorEntry.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Generator job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LockUtil.removeLockFile(getConf(), lock);
-      fs.delete(tempDir, true);
+      LOG.error("Generator job failed {}", e);
+      NutchJob.cleanupAfterFailure(tempDir, lock, fs);
       throw e;
     }
 
@@ -838,12 +846,21 @@ public class Generator extends NutchTool implements Tool {
       job.setOutputValueClass(CrawlDatum.class);
       FileOutputFormat.setOutputPath(job, tempDir2);
       try {
-        int complete = job.waitForCompletion(true)?0:1;
+        boolean success = job.waitForCompletion(true);
+        if (!success) {
+          String message = "Generator job did not succeed, job status:"
+              + job.getStatus().getState() + ", reason: "
+              + job.getStatus().getFailureInfo();
+          LOG.error(message);
+          NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+          NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
+          throw new RuntimeException(message);
+        }
         CrawlDb.install(job, dbDir);
       } catch (IOException | InterruptedException | ClassNotFoundException e) {
-        LockUtil.removeLockFile(getConf(), lock);
-        fs.delete(tempDir, true);
-        fs.delete(tempDir2, true);
+        LOG.error("Generator job failed {}", e);
+        NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+        NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
         throw e;
       }
 
@@ -894,8 +911,15 @@ public class Generator extends NutchTool implements Tool {
     job.setOutputValueClass(CrawlDatum.class);
     job.setSortComparatorClass(HashComparator.class);
     try {
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Generator job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));  
       throw e;
     }
diff --git a/src/java/org/apache/nutch/crawl/Injector.java 
b/src/java/org/apache/nutch/crawl/Injector.java
index d872dbf..093d483 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -40,9 +40,9 @@ import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.service.NutchServer;
 import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TimingUtil;
 
@@ -420,7 +420,7 @@ public class Injector extends NutchTool implements Tool {
             + job.getStatus().getState() + ", reason: "
             + job.getStatus().getFailureInfo();
         LOG.error(message);
-        cleanupAfterFailure(tempCrawlDb, lock, fs);
+        NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
         // throw exception so that calling routine can exit with error
         throw new RuntimeException(message);
       }
@@ -463,19 +463,7 @@ public class Injector extends NutchTool implements Tool {
       }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error("Injector job failed", e);
-      cleanupAfterFailure(tempCrawlDb, lock, fs);
-      throw e;
-    }
-  }
-
-  public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs)
-      throws IOException {
-    try {
-      if (fs.exists(tempCrawlDb)) {
-        fs.delete(tempCrawlDb, true);
-      }
-      LockUtil.removeLockFile(fs, lock);
-    } catch (IOException e) {
+      NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
       throw e;
     }
   }
diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java 
b/src/java/org/apache/nutch/crawl/LinkDb.java
index c6a32ba..37bfb7b 100644
--- a/src/java/org/apache/nutch/crawl/LinkDb.java
+++ b/src/java/org/apache/nutch/crawl/LinkDb.java
@@ -228,8 +228,17 @@ public class LinkDb extends NutchTool implements Tool {
               ParseData.DIR_NAME));
     }
     try {
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (IOException | InterruptedException | ClassNotFoundException  e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "LinkDb job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        LockUtil.removeLockFile(fs, lock);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      LOG.error("LinkDb job failed {}", e);
       LockUtil.removeLockFile(fs, lock);
       throw e;
     }
@@ -244,10 +253,18 @@ public class LinkDb extends NutchTool implements Tool {
       FileInputFormat.addInputPath(job, currentLinkDb);
       FileInputFormat.addInputPath(job, newLinkDb);
       try {
-        int complete = job.waitForCompletion(true)?0:1;
-      } catch (IOException e) {
-        LockUtil.removeLockFile(fs, lock);
-        fs.delete(newLinkDb, true);
+        boolean success = job.waitForCompletion(true);
+        if (!success) {
+          String message = "LinkDb job did not succeed, job status:"
+              + job.getStatus().getState() + ", reason: "
+              + job.getStatus().getFailureInfo();
+          LOG.error(message);
+          NutchJob.cleanupAfterFailure(newLinkDb, lock, fs);
+          throw new RuntimeException(message);
+        }
+      } catch (IOException | InterruptedException | ClassNotFoundException e) {
+        LOG.error("LinkDb job failed {}", e);
+        NutchJob.cleanupAfterFailure(newLinkDb, lock, fs);
         throw e;
       }
       fs.delete(newLinkDb, true);
diff --git a/src/java/org/apache/nutch/crawl/LinkDbMerger.java 
b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
index c8e3943..f2f0892 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
@@ -121,7 +121,20 @@ public class LinkDbMerger extends Configured implements 
Tool {
     for (int i = 0; i < dbs.length; i++) {
       FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME));
     }
-    int complete = job.waitForCompletion(true)?0:1;
+
+    try {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "LinkDbMerge job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      LOG.error("LinkDbMerge job failed {}", e);
+      throw e;
+    }
     FileSystem fs = output.getFileSystem(getConf());
     fs.mkdirs(output);
     fs.rename(FileOutputFormat.getOutputPath(job), new Path(output,
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java 
b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 8efaf0a..519fa59 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -166,8 +166,15 @@ public class LinkDbReader extends AbstractChecker 
implements Closeable {
     job.setOutputValueClass(Inlinks.class);
 
     try{
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e){
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "LinkDbRead job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e){
       LOG.error(StringUtils.stringifyException(e));
       throw e;
     }
diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java 
b/src/java/org/apache/nutch/fetcher/Fetcher.java
index 34fb136..ba34d68 100644
--- a/src/java/org/apache/nutch/fetcher/Fetcher.java
+++ b/src/java/org/apache/nutch/fetcher/Fetcher.java
@@ -495,7 +495,14 @@ public class Fetcher extends NutchTool implements Tool {
     job.setOutputValueClass(NutchWritable.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Fetcher job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
       throw e;
diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java 
b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
index e53e0c3..408e3ea 100644
--- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -208,7 +208,7 @@ public class ReadHostDb extends Configured implements Tool {
         throw new RuntimeException(message);
       }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error("ReadHostDb job failed", e);
+      LOG.error("ReadHostDb job failed {}", e);
       throw e;
     }
 
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java 
b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
index 9119c35..7209278 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
@@ -129,17 +129,23 @@ public class UpdateHostDb extends Configured implements 
Tool {
     conf.setClassLoader(Thread.currentThread().getContextClassLoader());
     
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "UpdateHostDb job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(tempHostDb, lock, fs);
+        throw new RuntimeException(message);
+      }
 
       FSUtils.replace(fs, old, current, true);
       FSUtils.replace(fs, current, tempHostDb, true);
 
       if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
     } catch (Exception e) {
-      if (fs.exists(tempHostDb)) {
-        fs.delete(tempHostDb, true);
-      }
-      LockUtil.removeLockFile(fs, lock);
+      LOG.error("UpdateHostDb job failed {}", e);
+      NutchJob.cleanupAfterFailure(tempHostDb, lock, fs);
       throw e;
     }
 
diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java 
b/src/java/org/apache/nutch/indexer/CleaningJob.java
index a8ac640..3ac8b9e 100644
--- a/src/java/org/apache/nutch/indexer/CleaningJob.java
+++ b/src/java/org/apache/nutch/indexer/CleaningJob.java
@@ -162,7 +162,14 @@ public class CleaningJob implements Tool {
     conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
 
     try{
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "CleaningJob did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
       throw e;
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java 
b/src/java/org/apache/nutch/indexer/IndexingJob.java
index e4997cb..6757291 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -146,8 +146,15 @@ public class IndexingJob extends NutchTool implements Tool 
{
     FileOutputFormat.setOutputPath(job, tmp);
     try {
       try{
-        int complete = job.waitForCompletion(true)?0:1;
-      } catch (InterruptedException | ClassNotFoundException e) {
+        boolean success = job.waitForCompletion(true);
+        if (!success) {
+          String message = "Indexing job did not succeed, job status:"
+              + job.getStatus().getState() + ", reason: "
+              + job.getStatus().getFailureInfo();
+          LOG.error(message);
+          throw new RuntimeException(message);
+        }
+      } catch (IOException | InterruptedException | ClassNotFoundException e) {
         LOG.error(StringUtils.stringifyException(e));
         throw e;
       }
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java 
b/src/java/org/apache/nutch/parse/ParseSegment.java
index 61aa997..1d64463 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -259,8 +259,15 @@ public class ParseSegment extends NutchTool implements 
Tool {
     job.setOutputValueClass(ParseImpl.class);
 
     try{
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (InterruptedException | ClassNotFoundException e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "Parse job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error(StringUtils.stringifyException(e));
       throw e;
     }
diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java 
b/src/java/org/apache/nutch/segment/SegmentMerger.java
index 780e10a..188ae69 100644
--- a/src/java/org/apache/nutch/segment/SegmentMerger.java
+++ b/src/java/org/apache/nutch/segment/SegmentMerger.java
@@ -738,7 +738,19 @@ public class SegmentMerger extends Configured implements 
Tool{
 
     setConf(conf);
 
-    int complete = job.waitForCompletion(true)?0:1;
+    try {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "SegmentMerger job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      LOG.error("SegmentMerger job failed {}", e);
+      throw e;
+    }
   }
 
   /**
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java 
b/src/java/org/apache/nutch/segment/SegmentReader.java
index 28b88cd..0b65a2b 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -244,7 +244,14 @@ public class SegmentReader extends Configured implements 
Tool {
     job.setOutputValueClass(NutchWritable.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "SegmentReader job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e ){
       LOG.error(StringUtils.stringifyException(e));
       throw e; 
diff --git a/src/java/org/apache/nutch/tools/FreeGenerator.java 
b/src/java/org/apache/nutch/tools/FreeGenerator.java
index 3b01bb4..ab5109e 100644
--- a/src/java/org/apache/nutch/tools/FreeGenerator.java
+++ b/src/java/org/apache/nutch/tools/FreeGenerator.java
@@ -201,8 +201,15 @@ public class FreeGenerator extends Configured implements 
Tool {
     FileOutputFormat.setOutputPath(job, new Path(args[1], new Path(segName,
         CrawlDatum.GENERATE_DIR_NAME)));
     try {
-      int complete = job.waitForCompletion(true)?0:1;
-    } catch (Exception e) {
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "FreeGenerator job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error("FAILED: " + StringUtils.stringifyException(e));
       return -1;
     }
diff --git a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java 
b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
index 1f9e660..499b246 100644
--- a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
+++ b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
@@ -395,7 +395,14 @@ public class ArcSegmentCreator extends Configured 
implements Tool {
     job.setOutputValueClass(NutchWritable.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "ArcSegmentCreator job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e){
       LOG.error(StringUtils.stringifyException(e));
       throw e;
diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java 
b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
index aae8064..2921a97 100644
--- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
+++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -285,13 +285,20 @@ public class WARCExporter extends Configured implements 
Tool {
     job.setOutputValueClass(WARCWritable.class);
 
     try {
-      int complete = job.waitForCompletion(true)?0:1;
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        String message = "WARCExporter job did not succeed, job status:"
+            + job.getStatus().getState() + ", reason: "
+            + job.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
       LOG.info(job.getCounters().toString());
       long end = System.currentTimeMillis();
       LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
           TimingUtil.elapsedTime(start, end));
-    } catch (Exception e) {
-      LOG.error("Exception caught", e);
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      LOG.error("WARCExporter job failed {}", e);
       return -1;
     }
 
diff --git a/src/java/org/apache/nutch/util/NutchJob.java 
b/src/java/org/apache/nutch/util/NutchJob.java
index 34a9acd..06f1cc2 100644
--- a/src/java/org/apache/nutch/util/NutchJob.java
+++ b/src/java/org/apache/nutch/util/NutchJob.java
@@ -19,6 +19,8 @@ package org.apache.nutch.util;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -34,4 +36,19 @@ public class NutchJob extends Job {
     return Job.getInstance(conf);
   } 
 
+  /*
+   * Clean up the file system in case of a job failure.
+   */
+  public static void cleanupAfterFailure(Path tempDir, Path lock, FileSystem 
fs)
+         throws IOException {
+    try {
+      if (fs.exists(tempDir)) {
+        fs.delete(tempDir, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+
 }
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java 
b/src/java/org/apache/nutch/util/SitemapProcessor.java
index 380ac07..70f4372 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -52,6 +52,7 @@ import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolFactory;
 import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.NutchJob;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -383,7 +384,7 @@ public class SitemapProcessor extends Configured implements 
Tool {
             + " job did not succeed, job status: " + job.getStatus().getState()
             + ", reason: " + job.getStatus().getFailureInfo();
         LOG.error(message);
-        cleanupAfterFailure(tempCrawlDb, lock, fs);
+        NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
         // throw exception so that calling routine can exit with error
         throw new RuntimeException(message);
       }
@@ -415,19 +416,7 @@ public class SitemapProcessor extends Configured 
implements Tool {
       }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
       LOG.error("SitemapProcessor_" + crawldb.toString(), e);
-      cleanupAfterFailure(tempCrawlDb, lock, fs);
-      throw e;
-    }
-  }
-
-  public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs)
-      throws IOException {
-    try {
-      if (fs.exists(tempCrawlDb)) {
-        fs.delete(tempCrawlDb, true);
-      }
-      LockUtil.removeLockFile(fs, lock);
-    } catch (IOException e) {
+      NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
       throw e;
     }
   }

-- 
To stop receiving notification emails like this one, please contact
sna...@apache.org.

Reply via email to