Author: mc
Date: Tue Aug 30 09:58:37 2005
New Revision: 264812

URL: http://svn.apache.org/viewcvs?rev=264812&view=rev
Log:

  Fix task killing and cleanup; there was a code path where
the tasktracker could silently drop a task.


Modified:
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264812&r1=264811&r2=264812&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java 
Tue Aug 30 09:58:37 2005
@@ -126,14 +126,9 @@
      */
     public synchronized void close() throws IOException {
         // Kill running tasks
-        Vector v = new Vector();
         for (Iterator it = tasks.values().iterator(); it.hasNext(); ) {
             TaskInProgress tip = (TaskInProgress) it.next();
-            v.add(tip);
-        }
-        for (Iterator it = v.iterator(); it.hasNext(); ) { 
-            TaskInProgress tip = (TaskInProgress) it.next();           
-            tip.cleanup();
+            tip.jobHasFinished();
         }
 
         // Wait for them to die and report in
@@ -237,7 +232,7 @@
                         (System.currentTimeMillis() - 
tip.getLastProgressReport() > TASK_TIMEOUT)) {
                         LOG.info("Task " + tip.getTask().getTaskId() + " timed 
out.  Killing.");
                         tip.reportDiagnosticInfo("Timed out.");
-                        tip.cleanup();
+                        tip.killAndCleanup();
                     }
                 }
             }
@@ -249,7 +244,7 @@
             if (toCloseId != null) {
               synchronized (this) {
                 TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
-                tip.cleanup();
+                tip.jobHasFinished();
               }
             }
             lastHeartbeat = now;
@@ -308,7 +303,7 @@
         StringBuffer diagnosticInfo = new StringBuffer();
         TaskRunner runner;
         boolean done = false;
-        boolean closeRunnerUponEnd = false;
+        boolean wasKilled = false;
 
         /**
          */
@@ -444,40 +439,53 @@
             }
 
             //
-            // We've already tried to 'cleanup' this task.  So once
-            // the process actually finishes, finish the cleanup work.
+            // If the task has failed, or if the task was killAndCleanup()'ed,
+            // we should clean up right away.  We only wait to cleanup
+            // if the task succeeded, and its results might be useful
+            // later on to downstream job processing.
             //
-            if (closeRunnerUponEnd) {
-                runningTasks.remove(task.getTaskId());
-                tasks.remove(task.getTaskId());
+            if (wasKilled || runstate == TaskStatus.FAILED) {
                 try {
-                    runner.close();
+                    cleanup();
                 } catch (IOException ie) {
                 }
-                try {
-                    FileUtil.fullyDelete(localTaskDir);
-                } catch (IOException ie) {
-                }                
             }
         }
 
         /**
-         * The owning job is done, and this task is no longer needed.  
-         * This method cleans up the task, first killing it if necessary.
+         * We no longer need anything from this task, as the job has
+         * finished.  If the task is still running, kill it (and clean up
+         */
+        public synchronized void jobHasFinished() throws IOException {
+            if (getRunState() == TaskStatus.RUNNING) {
+                killAndCleanup();
+            } else {
+                cleanup();
+            }
+        }
+
+        /**
+         * This task has run on too long, and should be killed.
          */
-        public synchronized void cleanup() throws IOException {
+        public synchronized void killAndCleanup() throws IOException {
             if (runstate == TaskStatus.RUNNING) {
-                closeRunnerUponEnd = true;
+                wasKilled = true;
                 runner.kill();
-            } else {
-                runningTasks.remove(task.getTaskId());
-                tasks.remove(task.getTaskId());
-                try {
-                    runner.close();
-                } catch (IOException ie) {
-                }
-                FileUtil.fullyDelete(localTaskDir);
             }
+        }
+
+        /**
+         * We no longer need anything from this task.  Either the 
+         * controlling job is all done and the files have been copied
+         * away, or the task failed and we don't need the remains.
+         */
+        synchronized void cleanup() throws IOException {
+            tasks.remove(task.getTaskId());
+            try {
+                runner.close();
+            } catch (IOException ie) {
+            }
+            FileUtil.fullyDelete(localTaskDir);
         }
     }
 


Reply via email to