Author: bikas
Date: Wed Jul 24 22:16:37 2013
New Revision: 1506752
URL: http://svn.apache.org/r1506752
Log:
Merge r1506750 from trunk to branch-2 for YARN-875. Application can hang if
AMRMClientAsync callback thread has exception (Xuan Gong via bikas)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1506752&r1=1506751&r2=1506752&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Jul 24
22:16:37 2013
@@ -726,6 +726,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-873. YARNClient.getApplicationReport(unknownAppId) returns a null
report (Xuan Gong via bikas)
+ YARN-875. Application can hang if AMRMClientAsync callback thread has
+ exception (Xuan Gong via bikas)
+
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1506752&r1=1506751&r2=1506752&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
Wed Jul 24 22:16:37 2013
@@ -649,8 +649,9 @@ public class ApplicationMaster {
}
@Override
- public void onError(Exception e) {
+ public void onError(Throwable e) {
done = true;
+ resourceManager.stop();
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1506752&r1=1506751&r2=1506752&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
Wed Jul 24 22:16:37 2013
@@ -220,6 +220,13 @@ extends AbstractService {
public float getProgress();
- public void onError(Exception e);
+ /**
+ * Called when error comes from RM communications as well as from errors in
+ * the callback itself from the app. Calling
+ * stop() is the recommended action.
+ *
+ * @param e
+ */
+ public void onError(Throwable e);
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1506752&r1=1506751&r2=1506752&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
Wed Jul 24 22:16:37 2013
@@ -217,7 +217,7 @@ extends AMRMClientAsync<T> {
// synchronization ensures we don't send heartbeats after unregistering
synchronized (unregisterHeartbeatLock) {
if (!keepRunning) {
- break;
+ return;
}
try {
@@ -227,13 +227,13 @@ extends AMRMClientAsync<T> {
savedException = ex;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
- break;
+ return;
} catch (IOException e) {
LOG.error("IO exception on heartbeat", e);
savedException = e;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
- break;
+ return;
}
}
if (response != null) {
@@ -266,51 +266,60 @@ extends AMRMClientAsync<T> {
}
public void run() {
- while (keepRunning) {
- AllocateResponse response;
+ while (true) {
+ if (!keepRunning) {
+ return;
+ }
try {
+ AllocateResponse response;
if(savedException != null) {
LOG.error("Stopping callback due to: ", savedException);
handler.onError(savedException);
- break;
+ return;
+ }
+ try {
+ response = responseQueue.take();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting for queue", ex);
+ continue;
}
- response = responseQueue.take();
- } catch (InterruptedException ex) {
- LOG.info("Interrupted while waiting for queue", ex);
- continue;
- }
- if (response.getAMCommand() != null) {
- switch(response.getAMCommand()) {
- case AM_RESYNC:
- case AM_SHUTDOWN:
- handler.onShutdownRequest();
- LOG.info("Shutdown requested. Stopping callback.");
- return;
- default:
- String msg =
- "Unhandled value of AMCommand: " + response.getAMCommand();
- LOG.error(msg);
- throw new YarnRuntimeException(msg);
+ if (response.getAMCommand() != null) {
+ switch(response.getAMCommand()) {
+ case AM_RESYNC:
+ case AM_SHUTDOWN:
+ handler.onShutdownRequest();
+ LOG.info("Shutdown requested. Stopping callback.");
+ return;
+ default:
+ String msg =
+ "Unhandled value of RM AMCommand: " +
response.getAMCommand();
+ LOG.error(msg);
+ throw new YarnRuntimeException(msg);
+ }
+ }
+ List<NodeReport> updatedNodes = response.getUpdatedNodes();
+ if (!updatedNodes.isEmpty()) {
+ handler.onNodesUpdated(updatedNodes);
}
- }
- List<NodeReport> updatedNodes = response.getUpdatedNodes();
- if (!updatedNodes.isEmpty()) {
- handler.onNodesUpdated(updatedNodes);
- }
-
- List<ContainerStatus> completed =
- response.getCompletedContainersStatuses();
- if (!completed.isEmpty()) {
- handler.onContainersCompleted(completed);
- }
- List<Container> allocated = response.getAllocatedContainers();
- if (!allocated.isEmpty()) {
- handler.onContainersAllocated(allocated);
+ List<ContainerStatus> completed =
+ response.getCompletedContainersStatuses();
+ if (!completed.isEmpty()) {
+ handler.onContainersCompleted(completed);
+ }
+
+ List<Container> allocated = response.getAllocatedContainers();
+ if (!allocated.isEmpty()) {
+ handler.onContainersAllocated(allocated);
+ }
+
+ progress = handler.getProgress();
+ } catch (Throwable ex) {
+ handler.onError(ex);
+ // re-throw exception to end the thread
+ throw new YarnRuntimeException(ex);
}
-
- progress = handler.getProgress();
}
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1506752&r1=1506751&r2=1506752&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Wed Jul 24 22:16:37 2013
@@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.client.ap
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -264,13 +267,13 @@ public class TestAMRMClientAsync {
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
- callbackHandler.registerAsyncClient(asyncClient);
+ callbackHandler.asynClient = asyncClient;
asyncClient.init(conf);
asyncClient.start();
synchronized (callbackHandler.notifier) {
asyncClient.registerApplicationMaster("localhost", 1234, null);
- while(callbackHandler.stop == false) {
+ while(callbackHandler.notify == false) {
try {
callbackHandler.notifier.wait();
} catch (InterruptedException e) {
@@ -280,6 +283,65 @@ public class TestAMRMClientAsync {
}
}
+ void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler)
throws
+ InterruptedException, YarnException, IOException {
+ Configuration conf = new Configuration();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ List<ContainerStatus> completed = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ final AllocateResponse response = createAllocateResponse(completed,
+ new ArrayList<Container>(), null);
+
+ when(client.allocate(anyFloat())).thenReturn(response);
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ callbackHandler.asynClient = asyncClient;
+ callbackHandler.throwOutException = true;
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ // call register and wait for error callback and stop
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.notify == false) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ // verify error invoked
+ verify(callbackHandler, times(0)).getProgress();
+ verify(callbackHandler, times(1)).onError(any(Exception.class));
+ // sleep to wait for a few heartbeat calls that can trigger callbacks
+ Thread.sleep(50);
+ // verify no more invocations after the first one.
+ // ie. callback thread has stopped
+ verify(callbackHandler, times(0)).getProgress();
+ verify(callbackHandler, times(1)).onError(any(Exception.class));
+ }
+
+ @Test (timeout = 5000)
+ public void testCallBackThrowOutException() throws YarnException,
+ IOException, InterruptedException {
+ // test exception in callback with app calling stop() on app.onError()
+ TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2());
+ runCallBackThrowOutException(callbackHandler);
+ }
+
+ @Test (timeout = 5000)
+ public void testCallBackThrowOutExceptionNoStop() throws YarnException,
+ IOException, InterruptedException {
+ // test exception in callback with app not calling stop() on app.onError()
+ TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2());
+ callbackHandler.stop = false;
+ runCallBackThrowOutException(callbackHandler);
+ }
+
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated,
List<NMToken> nmTokens) {
@@ -378,8 +440,8 @@ public class TestAMRMClientAsync {
}
@Override
- public void onError(Exception e) {
- savedException = e;
+ public void onError(Throwable e) {
+ savedException = new Exception(e.getMessage());
synchronized (notifier) {
notifier.notifyAll();
}
@@ -390,10 +452,16 @@ public class TestAMRMClientAsync {
Object notifier = new Object();
@SuppressWarnings("rawtypes")
AMRMClientAsync asynClient;
- boolean stop = false;
+ boolean stop = true;
+ boolean notify = false;
+ boolean throwOutException = false;
@Override
- public void onContainersCompleted(List<ContainerStatus> statuses) {}
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ if (throwOutException) {
+ throw new YarnRuntimeException("Exception from callback handler");
+ }
+ }
@Override
public void onContainersAllocated(List<Container> containers) {}
@@ -406,20 +474,24 @@ public class TestAMRMClientAsync {
@Override
public float getProgress() {
- asynClient.stop();
- stop = true;
- synchronized (notifier) {
- notifier.notifyAll();
- }
+ callStopAndNotify();
return 0;
}
@Override
- public void onError(Exception e) {}
+ public void onError(Throwable e) {
+ Assert.assertEquals(e.getMessage(), "Exception from callback handler");
+ callStopAndNotify();
+ }
- public void registerAsyncClient(
- AMRMClientAsync<ContainerRequest> asyncClient) {
- this.asynClient = asyncClient;
+ void callStopAndNotify() {
+ if(stop) {
+ asynClient.stop();
+ }
+ notify = true;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
}
}
}