Author: mc
Date: Mon Oct 17 12:49:45 2005
New Revision: 325971
URL: http://svn.apache.org/viewcvs?rev=325971&view=rev
Log:
Fix bug when killing a job. Still-undispatched
tasks were not properly eliminated, so they would
keep going, long after the job was "killed".
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=325971&r1=325970&r2=325971&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
Mon Oct 17 12:49:45 2005
@@ -648,6 +648,20 @@
public synchronized void kill() {
if (status.getRunState() != JobStatus.FAILED) {
this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f,
JobStatus.FAILED);
+
+ //
+ // Kill all the pending tasks
+ //
+ synchronized (unassignedTasks) {
+ for (Iterator it = unassignedTasks.iterator();
it.hasNext(); ) {
+ String taskid = (String) it.next();
+ if ((incompleteMapTasks.get(taskid) != null) ||
+ (incompleteReduceTasks.get(taskid) != null)) {
+ it.remove();
+ }
+ }
+ }
+
this.finishTime = System.currentTimeMillis();
}
}
@@ -925,7 +939,9 @@
attemptedReduceExecutions++;
}
updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f,
TaskStatus.UNASSIGNED, "", ""));
- unassignedTasks.add(taskid);
+ synchronized (unassignedTasks) {
+ unassignedTasks.add(taskid);
+ }
}
}
@@ -985,28 +1001,30 @@
* just grabs a single item out of the pending task list and hands it back.
*/
Task getTaskAssignment(String taskTracker) {
- if (unassignedTasks.size() > 0) {
- String taskid = (String) unassignedTasks.elementAt(0);
- unassignedTasks.remove(taskid);
-
- // Move task status to RUNNING
- JobInProgress job = (JobInProgress) jobs.get((String)
taskToJobMap.get(taskid));
- job.updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f,
TaskStatus.RUNNING, "", ""));
-
- // Remember where we are running it
- TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
- if (taskset == null) {
- taskset = new TreeSet();
- trackerToTaskMap.put(taskTracker, taskset);
- }
- LOG.info("Adding task '" + taskid + "' to set for tracker '" +
taskTracker + "'");
- taskset.add(taskid);
-
- taskToTrackerMap.put(taskid, taskTracker);
-
- return job.getTask(taskid);
- } else {
- return null;
+ synchronized (unassignedTasks) {
+ if (unassignedTasks.size() > 0) {
+ String taskid = (String) unassignedTasks.elementAt(0);
+ unassignedTasks.remove(taskid);
+
+ // Move task status to RUNNING
+ JobInProgress job = (JobInProgress) jobs.get((String)
taskToJobMap.get(taskid));
+ job.updateTaskStatus(taskid, new TaskStatus(taskid, 0.0f,
TaskStatus.RUNNING, "", ""));
+
+ // Remember where we are running it
+ TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToTaskMap.put(taskTracker, taskset);
+ }
+ LOG.info("Adding task '" + taskid + "' to set for tracker '" +
taskTracker + "'");
+ taskset.add(taskid);
+
+ taskToTrackerMap.put(taskid, taskTracker);
+
+ return job.getTask(taskid);
+ } else {
+ return null;
+ }
}
}