Author: mc
Date: Mon Oct 17 12:49:45 2005
New Revision: 325971

URL: http://svn.apache.org/viewcvs?rev=325971&view=rev
Log:

  Fix bug when killing a job.  Still-undispatched 
tasks were not properly eliminated, so they would
keep going, long after the job was "killed".


Modified:
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=325971&r1=325970&r2=325971&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java 
Mon Oct 17 12:49:45 2005
@@ -648,6 +648,20 @@
         public synchronized void kill() {
             if (status.getRunState() != JobStatus.FAILED) {
                 this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, 
JobStatus.FAILED);
+
+                //
+                // Kill all the pending tasks
+                //
+                synchronized (unassignedTasks) {
+                    for (Iterator it = unassignedTasks.iterator(); 
it.hasNext(); ) {
+                        String taskid = (String) it.next();
+                        if ((incompleteMapTasks.get(taskid) != null) ||
+                            (incompleteReduceTasks.get(taskid) != null)) {
+                            it.remove();
+                        }
+                    }
+                }
+
                 this.finishTime = System.currentTimeMillis();
             }
         }
@@ -925,7 +939,9 @@
                 attemptedReduceExecutions++;
             }
             updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f, 
TaskStatus.UNASSIGNED, "", ""));
-            unassignedTasks.add(taskid);
+            synchronized (unassignedTasks) {
+                unassignedTasks.add(taskid);
+            }
         }
     }
 
@@ -985,28 +1001,30 @@
      * just grabs a single item out of the pending task list and hands it back.
      */
     Task getTaskAssignment(String taskTracker) {
-        if (unassignedTasks.size() > 0) {
-            String taskid = (String) unassignedTasks.elementAt(0);
-            unassignedTasks.remove(taskid);
-
-            // Move task status to RUNNING
-            JobInProgress job = (JobInProgress) jobs.get((String) 
taskToJobMap.get(taskid));
-            job.updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f, 
TaskStatus.RUNNING, "", ""));
-
-            // Remember where we are running it
-            TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
-            if (taskset == null) {
-                taskset = new TreeSet();
-                trackerToTaskMap.put(taskTracker, taskset);
-            }
-            LOG.info("Adding task '" + taskid + "' to set for tracker '" + 
taskTracker + "'");
-            taskset.add(taskid);
-
-            taskToTrackerMap.put(taskid, taskTracker);
-
-            return job.getTask(taskid);
-        } else {
-            return null;
+        synchronized (unassignedTasks) {
+            if (unassignedTasks.size() > 0) {
+                String taskid = (String) unassignedTasks.elementAt(0);
+                unassignedTasks.remove(taskid);
+
+                // Move task status to RUNNING
+                JobInProgress job = (JobInProgress) jobs.get((String) 
taskToJobMap.get(taskid));
+                job.updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f, 
TaskStatus.RUNNING, "", ""));
+
+                // Remember where we are running it
+                TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+                if (taskset == null) {
+                    taskset = new TreeSet();
+                    trackerToTaskMap.put(taskTracker, taskset);
+                }
+                LOG.info("Adding task '" + taskid + "' to set for tracker '" + 
taskTracker + "'");
+                taskset.add(taskid);
+
+                taskToTrackerMap.put(taskid, taskTracker);
+
+                return job.getTask(taskid);
+            } else {
+                return null;
+            }
         }
     }
 


Reply via email to