Author: mc
Date: Tue Aug 2 11:43:14 2005
New Revision: 227062
URL: http://svn.apache.org/viewcvs?rev=227062&view=rev
Log:
The TaskTracker will now kill child tasks that fail to
report progress for a 60-second period. This will take care
of hung tasks.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=227062&r1=227061&r2=227062&view=diff
==============================================================================
---
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
(original)
+++
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Tue Aug 2 11:43:14 2005
@@ -35,7 +35,10 @@
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
MapOutputProtocol, Runnable {
private static final int MAX_CURRENT_TASKS =
NutchConf.get().getInt("mapred.tasktracker.tasks.maximum", 2);
+
static final long WAIT_FOR_DONE = 3 * 1000;
+ static final long TASK_MIN_PROGRESS_INTERVAL = 60 * 1000;
+
static final int STALE_STATE = 1;
public static final Logger LOG =
@@ -221,6 +224,18 @@
}
//
+ // Kill any tasks that have not reported progress in the last X
seconds.
+ //
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext();
) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ if ((tip.getRunState() == TaskStatus.RUNNING) &&
+ (System.currentTimeMillis() - tip.getLastProgressReport()
> TASK_MIN_PROGRESS_INTERVAL)) {
+
+ tip.cleanup();
+ }
+ }
+
+ //
// Check for any Tasks whose job may have ended
//
String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
@@ -280,6 +295,7 @@
File localTaskDir;
float progress;
int runstate;
+ long lastProgressReport;
StringBuffer diagnosticInfo = new StringBuffer();
TaskRunner runner;
boolean done = false;
@@ -289,6 +305,7 @@
*/
public TaskInProgress(Task task) throws IOException {
this.task = task;
+ this.lastProgressReport = System.currentTimeMillis();
this.localTaskDir = new File(localDir, task.getTaskId());
if (localTaskDir.exists()) {
FileUtil.fullyDelete(localTaskDir);
@@ -359,6 +376,19 @@
LOG.info(task.getTaskId()+" "+p+"% "+state);
this.progress = p;
this.runstate = TaskStatus.RUNNING;
+ this.lastProgressReport = System.currentTimeMillis();
+ }
+
+ /**
+ */
+ public long getLastProgressReport() {
+ return lastProgressReport;
+ }
+
+ /**
+ */
+ public int getRunState() {
+ return runstate;
}
/**