Repository: hadoop Updated Branches: refs/heads/branch-2.7 e650fcf25 -> bb44d8e83
Revert MAPREDUCE-5124 from 2.7.5. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb44d8e8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb44d8e8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb44d8e8 Branch: refs/heads/branch-2.7 Commit: bb44d8e8393268a4d2b5170e49fa060ad03605dc Parents: e650fcf Author: Konstantin V Shvachko <s...@apache.org> Authored: Fri Dec 1 17:14:52 2017 -0800 Committer: Konstantin V Shvachko <s...@apache.org> Committed: Fri Dec 1 17:14:52 2017 -0800 ---------------------------------------------------------------------- .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +----- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java | 19 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 +++---------------- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 63 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 53d758d..6627604 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,11 +22,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -58,8 +55,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import com.google.common.annotations.VisibleForTesting; - /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -85,11 +80,6 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>(); - - private ConcurrentMap<TaskAttemptId, - AtomicReference<TaskAttemptStatus>> attemptIdToStatus - = new ConcurrentHashMap<>(); - private Set<WrappedJvmID> launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); @@ -339,14 +329,6 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); - - AtomicReference<TaskAttemptStatus> lastStatusRef = - attemptIdToStatus.get(yarnAttemptID); - if (lastStatusRef == null) { - throw new IllegalStateException("Status update was called" - + " with illegal TaskAttemptId: " + yarnAttemptID); - } - taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -404,8 +386,9 @@ public class TaskAttemptListenerImpl extends CompositeService // // isn't ever changed by the Task itself. // taskStatus.getIncludeCounters(); - coalesceStatusUpdate(yarnAttemptID, taskAttemptStatus, lastStatusRef); - + context.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, + taskAttemptStatus)); return true; } @@ -486,9 +469,6 @@ public class TaskAttemptListenerImpl extends CompositeService launchedJVMs.add(jvmId); taskHeartbeatHandler.register(attemptID); - - attemptIdToStatus.put(attemptID, - new AtomicReference<TaskAttemptStatus>()); } @Override @@ -510,8 +490,6 @@ public class TaskAttemptListenerImpl extends CompositeService //unregister this attempt taskHeartbeatHandler.unregister(attemptID); - - attemptIdToStatus.remove(attemptID); } @Override @@ -520,47 +498,4 @@ public class TaskAttemptListenerImpl extends CompositeService return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } - - private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID, - TaskAttemptStatus taskAttemptStatus, - AtomicReference<TaskAttemptStatus> lastStatusRef) { - boolean asyncUpdatedNeeded = false; - TaskAttemptStatus lastStatus = lastStatusRef.get(); - - if (lastStatus == null) { - lastStatusRef.set(taskAttemptStatus); - asyncUpdatedNeeded = true; - } else { - List<TaskAttemptId> oldFetchFailedMaps = - taskAttemptStatus.fetchFailedMaps; - - // merge fetchFailedMaps from the previous update - if (lastStatus.fetchFailedMaps != null) { - if (taskAttemptStatus.fetchFailedMaps == null) { - taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps; - } else { - taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps); - } - } - - if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) { - // update failed - async dispatcher has processed it in the meantime - taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps; - lastStatusRef.set(taskAttemptStatus); - asyncUpdatedNeeded = true; - } - } - - if (asyncUpdatedNeeded) { - context.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, - lastStatusRef)); - } - } - - @VisibleForTesting - ConcurrentMap<TaskAttemptId, - AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() { - return attemptIdToStatus; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java index cef4fd0..715f63d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapreduce.v2.app.job.event; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; @@ -27,16 +26,17 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent { - private AtomicReference<TaskAttemptStatus> taskAttemptStatusRef; + + private TaskAttemptStatus reportedTaskAttemptStatus; public TaskAttemptStatusUpdateEvent(TaskAttemptId id, - AtomicReference<TaskAttemptStatus> taskAttemptStatusRef) { + TaskAttemptStatus taskAttemptStatus) { super(id, TaskAttemptEventType.TA_UPDATE); - this.taskAttemptStatusRef = taskAttemptStatusRef; + this.reportedTaskAttemptStatus = taskAttemptStatus; } - public AtomicReference<TaskAttemptStatus> getTaskAttemptStatusRef() { - return taskAttemptStatusRef; + public TaskAttemptStatus getReportedTaskAttemptStatus() { + return reportedTaskAttemptStatus; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 5a7e545..813010d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1663,7 +1662,6 @@ public abstract class TaskAttemptImpl implements // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); - //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); @@ -1959,20 +1957,15 @@ public abstract class TaskAttemptImpl implements } private static class StatusUpdater - implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - TaskAttemptStatusUpdateEvent statusEvent = - ((TaskAttemptStatusUpdateEvent)event); - - AtomicReference<TaskAttemptStatus> taskAttemptStatusRef = - statusEvent.getTaskAttemptStatusRef(); - + // Status update calls don't really change the state of the attempt. TaskAttemptStatus newReportedStatus = - taskAttemptStatusRef.getAndSet(null); - + ((TaskAttemptStatusUpdateEvent) event) + .getReportedTaskAttemptStatus(); // Now switch the information in the reportedStatus taskAttempt.reportedStatus = newReportedStatus; taskAttempt.reportedStatus.taskState = taskAttempt.getState(); @@ -1981,10 +1974,12 @@ public abstract class TaskAttemptImpl implements taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); + taskAttempt.updateProgressSplits(); + //if fetch failures are present, send the fetch failure event to job //this only will happen in reduce attempt type - if (taskAttempt.reportedStatus.fetchFailedMaps != null && + if (taskAttempt.reportedStatus.fetchFailedMaps != null && taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent( taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index f8a6a9e..d35d1e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -31,15 +31,14 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -47,83 +46,15 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.SystemClock; -import org.junit.After; -import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -/** - * Tests the behavior of TaskAttemptListenerImpl. - */ -@RunWith(MockitoJUnitRunner.class) public class TestTaskAttemptListenerImpl { - private static final String ATTEMPT1_ID = - "attempt_123456789012_0001_m_000001_0"; - private static final String ATTEMPT2_ID = - "attempt_123456789012_0001_m_000002_0"; - - private static final TaskAttemptId TASKATTEMPTID1 = - TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID)); - private static final TaskAttemptId TASKATTEMPTID2 = - TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID)); - - @Mock - private AppContext appCtx; - - @Mock - private JobTokenSecretManager secret; - - @Mock - private RMHeartbeatHandler rmHeartbeatHandler; - - @Mock - private TaskHeartbeatHandler hbHandler; - - @Mock - private Dispatcher dispatcher; - - @Mock - private Task task; - - @SuppressWarnings("rawtypes") - @Mock - private EventHandler<Event> ea; - - @SuppressWarnings("rawtypes") - @Captor - private ArgumentCaptor<Event> eventCaptor; - - private JVMId id; - private WrappedJvmID wid; - private TaskAttemptID attemptID; - private TaskAttemptId attemptId; - private ReduceTaskStatus firstReduceStatus; - private ReduceTaskStatus secondReduceStatus; - private ReduceTaskStatus thirdReduceStatus; - - private MockTaskAttemptListenerImpl listener; - - /** - * Extension of the original TaskAttemptImpl - * for testing purposes - */ - public static class MockTaskAttemptListenerImpl - extends TaskAttemptListenerImpl { + public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, @@ -154,24 +85,26 @@ public class TestTaskAttemptListenerImpl { //Empty } } - - @After - public void after() throws IOException { - if (listener != null) { - listener.close(); - listener = null; - } - } - + @Test (timeout=5000) public void testGetTask() throws IOException { - configureMocks(); - startListener(false); + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + MockTaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler); + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + JVMId id = new JVMId("foo",1, true, 1); + WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); - context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); @@ -179,18 +112,20 @@ public class TestTaskAttemptListenerImpl { // Verify ask after registration but before launch. // Don't kill, should be null. + TaskAttemptId attemptID = mock(TaskAttemptId.class); + Task task = mock(Task.class); //Now put a task with the ID listener.registerPendingTask(task, wid); result = listener.getTask(context); assertNull(result); // Unregister for more testing. - listener.unregister(attemptId, wid); + listener.unregister(attemptID, wid); // Verify ask after registration and launch //Now put a task with the ID listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptId, wid); - verify(hbHandler).register(attemptId); + listener.registerLaunchedTask(attemptID, wid); + verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); @@ -201,13 +136,15 @@ public class TestTaskAttemptListenerImpl { assertNotNull(result); assertTrue(result.shouldDie); - listener.unregister(attemptId, wid); + listener.unregister(attemptID, wid); // Verify after unregistration. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); + listener.stop(); + // test JVMID JVMId jvmid = JVMId.forName("jvm_001_002_m_004"); assertNotNull(jvmid); @@ -253,11 +190,14 @@ public class TestTaskAttemptListenerImpl { when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn( TypeConverter.fromYarn(empty)); - configureMocks(); + AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); - - listener = new MockTaskAttemptListenerImpl( - appCtx, secret, rmHeartbeatHandler, hbHandler) { + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -298,18 +238,20 @@ public class TestTaskAttemptListenerImpl { public void testCommitWindow() throws IOException { SystemClock clock = new SystemClock(); - configureMocks(); - org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); Job mockJob = mock(Job.class); when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); + AppContext appCtx = mock(AppContext.class); when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); when(appCtx.getClock()).thenReturn(clock); - - listener = new MockTaskAttemptListenerImpl( - appCtx, secret, rmHeartbeatHandler, hbHandler) { + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -327,119 +269,11 @@ public class TestTaskAttemptListenerImpl { verify(mockTask, never()).canCommit(any(TaskAttemptId.class)); // verify commit allowed when RM heartbeat is recent - when(rmHeartbeatHandler.getLastHeartbeatTime()) - .thenReturn(clock.getTime()); + when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime()); canCommit = listener.canCommit(tid); assertTrue(canCommit); verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class)); - } - - @Test - public void testSingleStatusUpdate() - throws IOException, InterruptedException { - configureMocks(); - startListener(true); - listener.statusUpdate(attemptID, firstReduceStatus); - - verify(ea).handle(eventCaptor.capture()); - TaskAttemptStatusUpdateEvent updateEvent = - (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); - - TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); - assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); - assertEquals(1, status.fetchFailedMaps.size()); - assertEquals(Phase.SHUFFLE, status.phase); - } - - @Test - public void testStatusUpdateEventCoalescing() - throws IOException, InterruptedException { - configureMocks(); - startListener(true); - - listener.statusUpdate(attemptID, firstReduceStatus); - listener.statusUpdate(attemptID, secondReduceStatus); - - verify(ea).handle(any(Event.class)); - ConcurrentMap<TaskAttemptId, - AtomicReference<TaskAttemptStatus>> attemptIdToStatus = - listener.getAttemptIdToStatus(); - TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get(); - - assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1)); - assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2)); - assertEquals(2, status.fetchFailedMaps.size()); - assertEquals(Phase.SORT, status.phase); - } - - @Test - public void testCoalescedStatusUpdatesCleared() - throws IOException, InterruptedException { - // First two events are coalesced, the third is not - configureMocks(); - startListener(true); - - listener.statusUpdate(attemptID, firstReduceStatus); - listener.statusUpdate(attemptID, secondReduceStatus); - ConcurrentMap<TaskAttemptId, - AtomicReference<TaskAttemptStatus>> attemptIdToStatus = - listener.getAttemptIdToStatus(); - attemptIdToStatus.get(attemptId).set(null); - listener.statusUpdate(attemptID, thirdReduceStatus); - - verify(ea, times(2)).handle(eventCaptor.capture()); - TaskAttemptStatusUpdateEvent updateEvent = - (TaskAttemptStatusUpdateEvent) eventCaptor.getValue(); - - TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get(); - assertNull(status.fetchFailedMaps); - assertEquals(Phase.REDUCE, status.phase); - } - - @Test(expected = IllegalStateException.class) - public void testStatusUpdateFromUnregisteredTask() - throws IOException, InterruptedException{ - configureMocks(); - startListener(false); - - listener.statusUpdate(attemptID, firstReduceStatus); - } - - private void configureMocks() { - firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, - TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE, - new Counters()); - firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID)); - - secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, - TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT, - new Counters()); - secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID)); - - thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1, - TaskStatus.State.RUNNING, "", "RUNNING", "", - TaskStatus.Phase.REDUCE, new Counters()); - - when(dispatcher.getEventHandler()).thenReturn(ea); - when(appCtx.getEventHandler()).thenReturn(ea); - listener = new MockTaskAttemptListenerImpl(appCtx, secret, - rmHeartbeatHandler, hbHandler); - id = new JVMId("foo", 1, true, 1); - wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); - attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); - attemptId = TypeConverter.toYarn(attemptID); - } - - private void startListener(boolean registerTask) { - Configuration conf = new Configuration(); - - listener.init(conf); - listener.start(); - - if (registerTask) { - listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptId, wid); - } + listener.stop(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index b500712..4e4e2e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskCompletionEvent; @@ -426,7 +425,7 @@ public class TestFetchFailure { status.stateString = "OK"; status.taskState = attempt.getState(); TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(), - new AtomicReference<>(status)); + status); app.getContext().getEventHandler().handle(event); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java index ca3c28c..77f9a09 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -104,8 +103,7 @@ public class TestMRClientService { taskAttemptStatus.phase = Phase.MAP; // send the status update app.getContext().getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(attempt.getID(), - new AtomicReference<>(taskAttemptStatus))); + new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus)); //verify that all object are fully populated by invoking RPCs. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb44d8e8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index 98bd03e..d2edd19 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -85,8 +84,7 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); appEventHandler.handle(event); } } @@ -157,8 +155,7 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); appEventHandler.handle(event); } } @@ -183,8 +180,7 @@ public class TestSpeculativeExecutionWithMRApp { TaskAttemptState.RUNNING); speculatedTask = task.getValue(); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); appEventHandler.handle(event); } } @@ -199,8 +195,7 @@ public class TestSpeculativeExecutionWithMRApp { createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, TaskAttemptState.RUNNING); TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); appEventHandler.handle(event); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org