Author: tgraves
Date: Mon Apr 2 20:30:53 2012
New Revision: 1308537
URL: http://svn.apache.org/viewvc?rev=1308537&view=rev
Log:
merge -r 1308532:1308533 from branch-2. FIXES: MAPREDUCE-4089
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
- copied unchanged from r1308533,
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon
Apr 2 20:30:53 2012
@@ -13,7 +13,9 @@ Release 0.23.3 - UNRELEASED
BUG FIXES
MAPREDUCE-4092. commitJob Exception does not fail job (Jon Eagles via
- bobby)
+ bobby)
+
+ MAPREDUCE-4089. Hung Tasks never time out. (Robert Evans via tgraves)
Release 0.23.2 - UNRELEASED
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
Mon Apr 2 20:30:53 2012
@@ -175,7 +175,7 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
- taskHeartbeatHandler.receivedPing(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId());
@@ -203,7 +203,7 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
- taskHeartbeatHandler.receivedPing(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
//Ignorable TaskStatus? - since a task will send a LastStatusUpdate
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID,
@@ -217,7 +217,7 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
- taskHeartbeatHandler.receivedPing(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -270,7 +270,7 @@ public class TaskAttemptListenerImpl ext
context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
fromEventId, maxEvents);
- taskHeartbeatHandler.receivedPing(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
// filter the events to return only map completion events in old format
List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
@@ -287,7 +287,7 @@ public class TaskAttemptListenerImpl ext
@Override
public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Ping from " + taskAttemptID.toString());
- taskHeartbeatHandler.receivedPing(TypeConverter.toYarn(taskAttemptID));
+ taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID));
return true;
}
@@ -299,7 +299,7 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
- taskHeartbeatHandler.receivedPing(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
// This is mainly used for cases where we want to propagate exception
traces
// of tasks that fail.
@@ -317,7 +317,7 @@ public class TaskAttemptListenerImpl ext
LOG.info("Status update from " + taskAttemptID.toString());
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
- taskHeartbeatHandler.receivedPing(yarnAttemptID);
+ taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus();
taskAttemptStatus.id = yarnAttemptID;
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
Mon Apr 2 20:30:53 2012
@@ -44,9 +44,36 @@ import org.apache.hadoop.yarn.service.Ab
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class TaskHeartbeatHandler extends AbstractService {
-
+
+ private static class ReportTime {
+ private long lastPing;
+ private long lastProgress;
+
+ public ReportTime(long time) {
+ setLastProgress(time);
+ }
+
+ public synchronized void setLastPing(long time) {
+ lastPing = time;
+ }
+
+ public synchronized void setLastProgress(long time) {
+ lastProgress = time;
+ lastPing = time;
+ }
+
+ public synchronized long getLastPing() {
+ return lastPing;
+ }
+
+ public synchronized long getLastProgress() {
+ return lastProgress;
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
-
+ private static final int PING_TIMEOUT = 5 * 60 * 1000;
+
//thread which runs periodically to see the last time since a heartbeat is
//received from a task.
private Thread lostTaskCheckerThread;
@@ -56,8 +83,8 @@ public class TaskHeartbeatHandler extend
private final EventHandler eventHandler;
private final Clock clock;
-
- private ConcurrentMap<TaskAttemptId, Long> runningAttempts;
+
+ private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
int numThreads) {
@@ -65,7 +92,7 @@ public class TaskHeartbeatHandler extend
this.eventHandler = eventHandler;
this.clock = clock;
runningAttempts =
- new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads);
+ new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
}
@Override
@@ -91,14 +118,26 @@ public class TaskHeartbeatHandler extend
super.stop();
}
- public void receivedPing(TaskAttemptId attemptID) {
+ public void progressing(TaskAttemptId attemptID) {
//only put for the registered attempts
//TODO throw an exception if the task isn't registered.
- runningAttempts.replace(attemptID, clock.getTime());
+ ReportTime time = runningAttempts.get(attemptID);
+ if(time != null) {
+ time.setLastProgress(clock.getTime());
+ }
}
+ public void pinged(TaskAttemptId attemptID) {
+ //only put for the registered attempts
+ //TODO throw an exception if the task isn't registered.
+ ReportTime time = runningAttempts.get(attemptID);
+ if(time != null) {
+ time.setLastPing(clock.getTime());
+ }
+ }
+
public void register(TaskAttemptId attemptID) {
- runningAttempts.put(attemptID, clock.getTime());
+ runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
}
public void unregister(TaskAttemptId attemptID) {
@@ -110,30 +149,27 @@ public class TaskHeartbeatHandler extend
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
- Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
+ Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
runningAttempts.entrySet().iterator();
// avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
- Map.Entry<TaskAttemptId, Long> entry = iterator.next();
- if (currentTime > entry.getValue() + taskTimeOut) {
-
- //In case the iterator isn't picking up the latest.
- // Extra lookup outside of the iterator - but only if the task
- // is considered to be timed out.
- Long taskTime = runningAttempts.get(entry.getKey());
- if (taskTime != null && currentTime > taskTime + taskTimeOut) {
- // task is lost, remove from the list and raise lost event
- iterator.remove();
- eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
- .getKey(), "AttemptID:" + entry.getKey().toString()
- + " Timed out after " + taskTimeOut / 1000 + " secs"));
- eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
- TaskAttemptEventType.TA_TIMED_OUT));
- }
-
+ Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+ boolean taskTimedOut = (taskTimeOut > 0) &&
+ (currentTime > (entry.getValue().getLastProgress() +
taskTimeOut));
+ boolean pingTimedOut =
+ (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
+
+ if(taskTimedOut || pingTimedOut) {
+ // task is lost, remove from the list and raise lost event
+ iterator.remove();
+ eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+ .getKey(), "AttemptID:" + entry.getKey().toString()
+ + " Timed out after " + taskTimeOut / 1000 + " secs"));
+ eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+ TaskAttemptEventType.TA_TIMED_OUT));
}
}
try {
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Mon Apr 2 20:30:53 2012
@@ -351,7 +351,7 @@
<value>600000</value>
<description>The number of milliseconds before a task will be
terminated if it neither reads an input, writes an output, nor
- updates its status string.
+ updates its status string. A value of 0 disables the timeout.
</description>
</property>