Author: mc Date: Tue Aug 30 09:58:37 2005 New Revision: 264812 URL: http://svn.apache.org/viewcvs?rev=264812&view=rev Log:
Fix task killing and cleanup; there was a code path where the tasktracker could silently drop a task. Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264812&r1=264811&r2=264812&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Aug 30 09:58:37 2005 @@ -126,14 +126,9 @@ */ public synchronized void close() throws IOException { // Kill running tasks - Vector v = new Vector(); for (Iterator it = tasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); - v.add(tip); - } - for (Iterator it = v.iterator(); it.hasNext(); ) { - TaskInProgress tip = (TaskInProgress) it.next(); - tip.cleanup(); + tip.jobHasFinished(); } // Wait for them to die and report in @@ -237,7 +232,7 @@ (System.currentTimeMillis() - tip.getLastProgressReport() > TASK_TIMEOUT)) { LOG.info("Task " + tip.getTask().getTaskId() + " timed out. Killing."); tip.reportDiagnosticInfo("Timed out."); - tip.cleanup(); + tip.killAndCleanup(); } } } @@ -249,7 +244,7 @@ if (toCloseId != null) { synchronized (this) { TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId); - tip.cleanup(); + tip.jobHasFinished(); } } lastHeartbeat = now; @@ -308,7 +303,7 @@ StringBuffer diagnosticInfo = new StringBuffer(); TaskRunner runner; boolean done = false; - boolean closeRunnerUponEnd = false; + boolean wasKilled = false; /** */ @@ -444,40 +439,53 @@ } // - // We've already tried to 'cleanup' this task. So once - // the process actually finishes, finish the cleanup work. + // If the task has failed, or if the task was killAndCleanup()'ed, + // we should clean up right away. We only wait to cleanup + // if the task succeeded, and its results might be useful + // later on to downstream job processing. // - if (closeRunnerUponEnd) { - runningTasks.remove(task.getTaskId()); - tasks.remove(task.getTaskId()); + if (wasKilled || runstate == TaskStatus.FAILED) { try { - runner.close(); + cleanup(); } catch (IOException ie) { } - try { - FileUtil.fullyDelete(localTaskDir); - } catch (IOException ie) { - } } } /** - * The owning job is done, and this task is no longer needed. - * This method cleans up the task, first killing it if necessary. + * We no longer need anything from this task, as the job has + * finished. If the task is still running, kill it (and clean up + */ + public synchronized void jobHasFinished() throws IOException { + if (getRunState() == TaskStatus.RUNNING) { + killAndCleanup(); + } else { + cleanup(); + } + } + + /** + * This task has run on too long, and should be killed. */ - public synchronized void cleanup() throws IOException { + public synchronized void killAndCleanup() throws IOException { if (runstate == TaskStatus.RUNNING) { - closeRunnerUponEnd = true; + wasKilled = true; runner.kill(); - } else { - runningTasks.remove(task.getTaskId()); - tasks.remove(task.getTaskId()); - try { - runner.close(); - } catch (IOException ie) { - } - FileUtil.fullyDelete(localTaskDir); } + } + + /** + * We no longer need anything from this task. Either the + * controlling job is all done and the files have been copied + * away, or the task failed and we don't need the remains. + */ + synchronized void cleanup() throws IOException { + tasks.remove(task.getTaskId()); + try { + runner.close(); + } catch (IOException ie) { + } + FileUtil.fullyDelete(localTaskDir); } }