This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 5907604 NUTCH-2518 Cleaning up the file system after a job failure. new 8682b96 Merge pull request #307 from Omkar20895/NUTCH-2518 5907604 is described below commit 5907604b341f3eda1aae924606fce9022446132c Author: Omkar20895 <omkarreddy2...@gmail.com> AuthorDate: Mon Apr 2 16:38:43 2018 +0530 NUTCH-2518 Cleaning up the file system after a job failure. --- src/java/org/apache/nutch/crawl/CrawlDb.java | 20 +++++--- src/java/org/apache/nutch/crawl/CrawlDbMerger.java | 22 ++++++--- src/java/org/apache/nutch/crawl/CrawlDbReader.java | 57 +++++++++++++++++----- .../org/apache/nutch/crawl/DeduplicationJob.java | 28 +++++++++-- src/java/org/apache/nutch/crawl/Generator.java | 42 ++++++++++++---- src/java/org/apache/nutch/crawl/Injector.java | 18 ++----- src/java/org/apache/nutch/crawl/LinkDb.java | 29 ++++++++--- src/java/org/apache/nutch/crawl/LinkDbMerger.java | 15 +++++- src/java/org/apache/nutch/crawl/LinkDbReader.java | 11 ++++- src/java/org/apache/nutch/fetcher/Fetcher.java | 9 +++- src/java/org/apache/nutch/hostdb/ReadHostDb.java | 2 +- src/java/org/apache/nutch/hostdb/UpdateHostDb.java | 16 ++++-- src/java/org/apache/nutch/indexer/CleaningJob.java | 9 +++- src/java/org/apache/nutch/indexer/IndexingJob.java | 11 ++++- src/java/org/apache/nutch/parse/ParseSegment.java | 11 ++++- .../org/apache/nutch/segment/SegmentMerger.java | 14 +++++- .../org/apache/nutch/segment/SegmentReader.java | 9 +++- src/java/org/apache/nutch/tools/FreeGenerator.java | 11 ++++- .../apache/nutch/tools/arc/ArcSegmentCreator.java | 9 +++- .../org/apache/nutch/tools/warc/WARCExporter.java | 13 +++-- src/java/org/apache/nutch/util/NutchJob.java | 17 +++++++ .../org/apache/nutch/util/SitemapProcessor.java | 17 ++----- 22 files changed, 292 insertions(+), 98 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java b/src/java/org/apache/nutch/crawl/CrawlDb.java index a545509..05fc3c6 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDb.java +++ b/src/java/org/apache/nutch/crawl/CrawlDb.java @@ -129,17 +129,23 @@ public class CrawlDb extends NutchTool implements Tool { LOG.info("CrawlDb update: Merging segment data into db."); } + FileSystem fs = crawlDb.getFileSystem(getConf()); + Path outPath = FileOutputFormat.getOutputPath(job); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Crawl job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(outPath, lock, fs); + throw new RuntimeException(message); + } } catch (IOException | InterruptedException | ClassNotFoundException e) { - FileSystem fs = crawlDb.getFileSystem(getConf()); - LockUtil.removeLockFile(fs, lock); - Path outPath = FileOutputFormat.getOutputPath(job); - if (fs.exists(outPath)) - fs.delete(outPath, true); + LOG.error("Crawl job failed {}", e); + NutchJob.cleanupAfterFailure(outPath, lock, fs); throw e; } - CrawlDb.install(job, crawlDb); diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java index 35eca60..d8756fd 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java @@ -144,15 +144,23 @@ public class CrawlDbMerger extends Configured implements Tool { } FileInputFormat.addInputPath(job, new Path(dbs[i], CrawlDb.CURRENT_NAME)); } + + Path outPath = FileOutputFormat.getOutputPath(job); + FileSystem fs = outPath.getFileSystem(getConf()); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CrawlDbMerger job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(outPath, lock, fs); + throw new RuntimeException(message); + } CrawlDb.install(job, output); - } catch (IOException e) { - LockUtil.removeLockFile(getConf(), lock); - Path outPath = FileOutputFormat.getOutputPath(job); - FileSystem fs = outPath.getFileSystem(getConf()); - if (fs.exists(outPath)) - fs.delete(outPath, true); + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("CrawlDbMerge job failed {}", e); + NutchJob.cleanupAfterFailure(outPath, lock, fs); throw e; } long end = System.currentTimeMillis(); diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java index c1a79e9..dcf5ace 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -401,15 +401,23 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { // https://issues.apache.org/jira/browse/NUTCH-1029 config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); - + FileSystem fileSystem = tmpFolder.getFileSystem(config); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CrawlDbReader job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + fileSystem.delete(tmpFolder, true); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); + fileSystem.delete(tmpFolder, true); throw e; } // reading the result - FileSystem fileSystem = tmpFolder.getFileSystem(config); SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder, config); Text key = new Text(); @@ -684,8 +692,15 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setOutputValueClass(CrawlDatum.class); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CrawlDbReader job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } @@ -788,11 +803,21 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setOutputValueClass(Text.class); job.getConfiguration().setFloat("db.reader.topn.min", min); - + + FileSystem fs = tempDir.getFileSystem(config); try{ - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CrawlDbReader job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + fs.delete(tempDir, true); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); + fs.delete(tempDir, true); throw e; } @@ -816,13 +841,21 @@ public class CrawlDbReader extends AbstractChecker implements Closeable { job.setNumReduceTasks(1); // create a single file. try{ - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CrawlDbReader job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + fs.delete(tempDir, true); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); + fs.delete(tempDir, true); throw e; } - FileSystem fs = tempDir.getFileSystem(config); fs.delete(tempDir, true); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java b/src/java/org/apache/nutch/crawl/DeduplicationJob.java index f2283ee..555f9e2 100644 --- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java +++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java @@ -312,8 +312,17 @@ public class DeduplicationJob extends NutchTool implements Tool { job.setMapperClass(DBFilter.class); job.setReducerClass(DedupReducer.class); + FileSystem fs = tempDir.getFileSystem(getConf()); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Crawl job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + fs.delete(tempDir, true); + throw new RuntimeException(message); + } CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus"); if (g != null) { Counter counter = g.findCounter("Documents marked as duplicate"); @@ -321,8 +330,9 @@ public class DeduplicationJob extends NutchTool implements Tool { LOG.info("Deduplication: " + (int) dups + " documents marked as duplicates"); } - } catch (final Exception e) { + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e)); + fs.delete(tempDir, true); return -1; } @@ -337,16 +347,24 @@ public class DeduplicationJob extends NutchTool implements Tool { mergeJob.setReducerClass(StatusUpdateReducer.class); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (final Exception e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Crawl job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + fs.delete(tempDir, true); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e)); + fs.delete(tempDir, true); return -1; } CrawlDb.install(mergeJob, dbPath); // clean up - FileSystem fs = tempDir.getFileSystem(getConf()); fs.delete(tempDir, true); long end = System.currentTimeMillis(); diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index c972a13..c9096d6 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -781,10 +781,18 @@ public class Generator extends NutchTool implements Tool { MultipleOutputs.addNamedOutput(job, "sequenceFiles", SequenceFileOutputFormat.class, FloatWritable.class, SelectorEntry.class); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Generator job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(tempDir, lock, fs); + throw new RuntimeException(message); + } } catch (IOException | InterruptedException | ClassNotFoundException e) { - LockUtil.removeLockFile(getConf(), lock); - fs.delete(tempDir, true); + LOG.error("Generator job failed {}", e); + NutchJob.cleanupAfterFailure(tempDir, lock, fs); throw e; } @@ -838,12 +846,21 @@ public class Generator extends NutchTool implements Tool { job.setOutputValueClass(CrawlDatum.class); FileOutputFormat.setOutputPath(job, tempDir2); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Generator job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(tempDir, lock, fs); + NutchJob.cleanupAfterFailure(tempDir2, lock, fs); + throw new RuntimeException(message); + } CrawlDb.install(job, dbDir); } catch (IOException | InterruptedException | ClassNotFoundException e) { - LockUtil.removeLockFile(getConf(), lock); - fs.delete(tempDir, true); - fs.delete(tempDir2, true); + LOG.error("Generator job failed {}", e); + NutchJob.cleanupAfterFailure(tempDir, lock, fs); + NutchJob.cleanupAfterFailure(tempDir2, lock, fs); throw e; } @@ -894,8 +911,15 @@ public class Generator extends NutchTool implements Tool { job.setOutputValueClass(CrawlDatum.class); job.setSortComparatorClass(HashComparator.class); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Generator job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java index d872dbf..093d483 100644 --- a/src/java/org/apache/nutch/crawl/Injector.java +++ b/src/java/org/apache/nutch/crawl/Injector.java @@ -40,9 +40,9 @@ import org.apache.nutch.net.URLFilters; import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.scoring.ScoringFilterException; import org.apache.nutch.scoring.ScoringFilters; -import org.apache.nutch.util.LockUtil; import org.apache.nutch.service.NutchServer; import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.NutchTool; import org.apache.nutch.util.TimingUtil; @@ -420,7 +420,7 @@ public class Injector extends NutchTool implements Tool { + job.getStatus().getState() + ", reason: " + job.getStatus().getFailureInfo(); LOG.error(message); - cleanupAfterFailure(tempCrawlDb, lock, fs); + NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs); // throw exception so that calling routine can exit with error throw new RuntimeException(message); } @@ -463,19 +463,7 @@ public class Injector extends NutchTool implements Tool { } } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("Injector job failed", e); - cleanupAfterFailure(tempCrawlDb, lock, fs); - throw e; - } - } - - public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs) - throws IOException { - try { - if (fs.exists(tempCrawlDb)) { - fs.delete(tempCrawlDb, true); - } - LockUtil.removeLockFile(fs, lock); - } catch (IOException e) { + NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs); throw e; } } diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java b/src/java/org/apache/nutch/crawl/LinkDb.java index c6a32ba..37bfb7b 100644 --- a/src/java/org/apache/nutch/crawl/LinkDb.java +++ b/src/java/org/apache/nutch/crawl/LinkDb.java @@ -228,8 +228,17 @@ public class LinkDb extends NutchTool implements Tool { ParseData.DIR_NAME)); } try { - int complete = job.waitForCompletion(true)?0:1; - } catch (IOException | InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "LinkDb job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + LockUtil.removeLockFile(fs, lock); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("LinkDb job failed {}", e); LockUtil.removeLockFile(fs, lock); throw e; } @@ -244,10 +253,18 @@ public class LinkDb extends NutchTool implements Tool { FileInputFormat.addInputPath(job, currentLinkDb); FileInputFormat.addInputPath(job, newLinkDb); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (IOException e) { - LockUtil.removeLockFile(fs, lock); - fs.delete(newLinkDb, true); + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "LinkDb job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(newLinkDb, lock, fs); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("LinkDb job failed {}", e); + NutchJob.cleanupAfterFailure(newLinkDb, lock, fs); throw e; } fs.delete(newLinkDb, true); diff --git a/src/java/org/apache/nutch/crawl/LinkDbMerger.java b/src/java/org/apache/nutch/crawl/LinkDbMerger.java index c8e3943..f2f0892 100644 --- a/src/java/org/apache/nutch/crawl/LinkDbMerger.java +++ b/src/java/org/apache/nutch/crawl/LinkDbMerger.java @@ -121,7 +121,20 @@ public class LinkDbMerger extends Configured implements Tool { for (int i = 0; i < dbs.length; i++) { FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME)); } - int complete = job.waitForCompletion(true)?0:1; + + try { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "LinkDbMerge job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("LinkDbMerge job failed {}", e); + throw e; + } FileSystem fs = output.getFileSystem(getConf()); fs.mkdirs(output); fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java b/src/java/org/apache/nutch/crawl/LinkDbReader.java index 8efaf0a..519fa59 100644 --- a/src/java/org/apache/nutch/crawl/LinkDbReader.java +++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java @@ -166,8 +166,15 @@ public class LinkDbReader extends AbstractChecker implements Closeable { job.setOutputValueClass(Inlinks.class); try{ - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e){ + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "LinkDbRead job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e){ LOG.error(StringUtils.stringifyException(e)); throw e; } diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 34fb136..ba34d68 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -495,7 +495,14 @@ public class Fetcher extends NutchTool implements Tool { job.setOutputValueClass(NutchWritable.class); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Fetcher job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } } catch (InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java b/src/java/org/apache/nutch/hostdb/ReadHostDb.java index e53e0c3..408e3ea 100644 --- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java +++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java @@ -208,7 +208,7 @@ public class ReadHostDb extends Configured implements Tool { throw new RuntimeException(message); } } catch (IOException | InterruptedException | ClassNotFoundException e) { - LOG.error("ReadHostDb job failed", e); + LOG.error("ReadHostDb job failed {}", e); throw e; } diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java index 9119c35..7209278 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java @@ -129,17 +129,23 @@ public class UpdateHostDb extends Configured implements Tool { conf.setClassLoader(Thread.currentThread().getContextClassLoader()); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "UpdateHostDb job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + NutchJob.cleanupAfterFailure(tempHostDb, lock, fs); + throw new RuntimeException(message); + } FSUtils.replace(fs, old, current, true); FSUtils.replace(fs, current, tempHostDb, true); if (!preserveBackup && fs.exists(old)) fs.delete(old, true); } catch (Exception e) { - if (fs.exists(tempHostDb)) { - fs.delete(tempHostDb, true); - } - LockUtil.removeLockFile(fs, lock); + LOG.error("UpdateHostDb job failed {}", e); + NutchJob.cleanupAfterFailure(tempHostDb, lock, fs); throw e; } diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java b/src/java/org/apache/nutch/indexer/CleaningJob.java index a8ac640..3ac8b9e 100644 --- a/src/java/org/apache/nutch/indexer/CleaningJob.java +++ b/src/java/org/apache/nutch/indexer/CleaningJob.java @@ -162,7 +162,14 @@ public class CleaningJob implements Tool { conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, true); try{ - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "CleaningJob did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } } catch (InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java index e4997cb..6757291 100644 --- a/src/java/org/apache/nutch/indexer/IndexingJob.java +++ b/src/java/org/apache/nutch/indexer/IndexingJob.java @@ -146,8 +146,15 @@ public class IndexingJob extends NutchTool implements Tool { FileOutputFormat.setOutputPath(job, tmp); try { try{ - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Indexing job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index 61aa997..1d64463 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -259,8 +259,15 @@ public class ParseSegment extends NutchTool implements Tool { job.setOutputValueClass(ParseImpl.class); try{ - int complete = job.waitForCompletion(true)?0:1; - } catch (InterruptedException | ClassNotFoundException e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "Parse job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java b/src/java/org/apache/nutch/segment/SegmentMerger.java index 780e10a..188ae69 100644 --- a/src/java/org/apache/nutch/segment/SegmentMerger.java +++ b/src/java/org/apache/nutch/segment/SegmentMerger.java @@ -738,7 +738,19 @@ public class SegmentMerger extends Configured implements Tool{ setConf(conf); - int complete = job.waitForCompletion(true)?0:1; + try { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "SegmentMerger job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("SegmentMerger job failed {}", e); + throw e; + } } /** diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java b/src/java/org/apache/nutch/segment/SegmentReader.java index 28b88cd..0b65a2b 100644 --- a/src/java/org/apache/nutch/segment/SegmentReader.java +++ b/src/java/org/apache/nutch/segment/SegmentReader.java @@ -244,7 +244,14 @@ public class SegmentReader extends Configured implements Tool { job.setOutputValueClass(NutchWritable.class); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "SegmentReader job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } } catch (IOException | InterruptedException | ClassNotFoundException e ){ LOG.error(StringUtils.stringifyException(e)); throw e; diff --git a/src/java/org/apache/nutch/tools/FreeGenerator.java b/src/java/org/apache/nutch/tools/FreeGenerator.java index 3b01bb4..ab5109e 100644 --- a/src/java/org/apache/nutch/tools/FreeGenerator.java +++ b/src/java/org/apache/nutch/tools/FreeGenerator.java @@ -201,8 +201,15 @@ public class FreeGenerator extends Configured implements Tool { FileOutputFormat.setOutputPath(job, new Path(args[1], new Path(segName, CrawlDatum.GENERATE_DIR_NAME))); try { - int complete = job.waitForCompletion(true)?0:1; - } catch (Exception e) { + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "FreeGenerator job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("FAILED: " + StringUtils.stringifyException(e)); return -1; } diff --git a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java index 1f9e660..499b246 100644 --- a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java +++ b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java @@ -395,7 +395,14 @@ public class ArcSegmentCreator extends Configured implements Tool { job.setOutputValueClass(NutchWritable.class); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "ArcSegmentCreator job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } } catch (IOException | InterruptedException | ClassNotFoundException e){ LOG.error(StringUtils.stringifyException(e)); throw e; diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java b/src/java/org/apache/nutch/tools/warc/WARCExporter.java index aae8064..2921a97 100644 --- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java +++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java @@ -285,13 +285,20 @@ public class WARCExporter extends Configured implements Tool { job.setOutputValueClass(WARCWritable.class); try { - int complete = job.waitForCompletion(true)?0:1; + boolean success = job.waitForCompletion(true); + if (!success) { + String message = "WARCExporter job did not succeed, job status:" + + job.getStatus().getState() + ", reason: " + + job.getStatus().getFailureInfo(); + LOG.error(message); + throw new RuntimeException(message); + } LOG.info(job.getCounters().toString()); long end = System.currentTimeMillis(); LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end), TimingUtil.elapsedTime(start, end)); - } catch (Exception e) { - LOG.error("Exception caught", e); + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("WARCExporter job failed {}", e); return -1; } diff --git a/src/java/org/apache/nutch/util/NutchJob.java b/src/java/org/apache/nutch/util/NutchJob.java index 34a9acd..06f1cc2 100644 --- a/src/java/org/apache/nutch/util/NutchJob.java +++ b/src/java/org/apache/nutch/util/NutchJob.java @@ -19,6 +19,8 @@ package org.apache.nutch.util; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -34,4 +36,19 @@ public class NutchJob extends Job { return Job.getInstance(conf); } + /* + * Clean up the file system in case of a job failure. + */ + public static void cleanupAfterFailure(Path tempDir, Path lock, FileSystem fs) + throws IOException { + try { + if (fs.exists(tempDir)) { + fs.delete(tempDir, true); + } + LockUtil.removeLockFile(fs, lock); + } catch (IOException e) { + throw e; + } + } + } diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java b/src/java/org/apache/nutch/util/SitemapProcessor.java index 380ac07..70f4372 100644 --- a/src/java/org/apache/nutch/util/SitemapProcessor.java +++ b/src/java/org/apache/nutch/util/SitemapProcessor.java @@ -52,6 +52,7 @@ import org.apache.nutch.protocol.Protocol; import org.apache.nutch.protocol.ProtocolFactory; import org.apache.nutch.protocol.ProtocolOutput; import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.util.NutchJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -383,7 +384,7 @@ public class SitemapProcessor extends Configured implements Tool { + " job did not succeed, job status: " + job.getStatus().getState() + ", reason: " + job.getStatus().getFailureInfo(); LOG.error(message); - cleanupAfterFailure(tempCrawlDb, lock, fs); + NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs); // throw exception so that calling routine can exit with error throw new RuntimeException(message); } @@ -415,19 +416,7 @@ public class SitemapProcessor extends Configured implements Tool { } } catch (IOException | InterruptedException | ClassNotFoundException e) { LOG.error("SitemapProcessor_" + crawldb.toString(), e); - cleanupAfterFailure(tempCrawlDb, lock, fs); - throw e; - } - } - - public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs) - throws IOException { - try { - if (fs.exists(tempCrawlDb)) { - fs.delete(tempCrawlDb, true); - } - LockUtil.removeLockFile(fs, lock); - } catch (IOException e) { + NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs); throw e; } } -- To stop receiving notification emails like this one, please contact sna...@apache.org.