Author: bikas
Date: Sat Jul 13 23:30:20 2013
New Revision: 1502914
URL: http://svn.apache.org/r1502914
Log:
YARN-763. AMRMClientAsync should stop heartbeating after receiving shutdown
from RM (Xuan Gong via bikas)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1502914&r1=1502913&r2=1502914&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Jul 13 23:30:20 2013
@@ -686,6 +686,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-541. getAllocatedContainers() is not returning all the allocated
containers (bikas)
+ YARN-763. AMRMClientAsync should stop heartbeating after receiving
+ shutdown from RM (Xuan Gong via bikas)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1502914&r1=1502913&r2=1502914&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
Sat Jul 13 23:30:20 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -92,6 +93,7 @@ extends AMRMClientAsync<T> {
@Override
protected void serviceStart() throws Exception {
+ handlerThread.setDaemon(true);
handlerThread.start();
client.start();
super.serviceStart();
@@ -99,27 +101,19 @@ extends AMRMClientAsync<T> {
/**
* Tells the heartbeat and handler threads to stop and waits for them to
- * terminate. Calling this method from the callback handler thread would
cause
- * deadlock, and thus should be avoided.
+ * terminate.
*/
@Override
protected void serviceStop() throws Exception {
- if (Thread.currentThread() == handlerThread) {
- throw new YarnRuntimeException("Cannot call stop from callback handler
thread!");
- }
keepRunning = false;
+ heartbeatThread.interrupt();
try {
heartbeatThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with heartbeat thread", ex);
}
client.stop();
- try {
- handlerThread.interrupt();
- handlerThread.join();
- } catch (InterruptedException ex) {
- LOG.error("Error joining with hander thread", ex);
- }
+ handlerThread.interrupt();
super.serviceStop();
}
@@ -248,6 +242,10 @@ extends AMRMClientAsync<T> {
while (true) {
try {
responseQueue.put(response);
+ if (response.getAMCommand() == AMCommand.AM_RESYNC
+ || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+ return;
+ }
break;
} catch (InterruptedException ex) {
LOG.info("Interrupted while waiting to put on response queue",
ex);
@@ -285,24 +283,18 @@ extends AMRMClientAsync<T> {
}
if (response.getAMCommand() != null) {
- boolean stop = false;
switch(response.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
- stop = true;
- break;
+ return;
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
- if(stop) {
- // should probably stop heartbeating also YARN-763
- break;
- }
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1502914&r1=1502913&r2=1502914&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Sat Jul 13 23:30:20 2013
@@ -23,7 +23,10 @@ import static org.mockito.Matchers.anyIn
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -218,6 +221,65 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
+ @Test (timeout = 10000)
+ public void testAMRMClientAsyncShutDown() throws Exception {
+ Configuration conf = new Configuration();
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ final AllocateResponse shutDownResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+ shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+ when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+
+ Thread.sleep(50);
+
+ verify(client, times(1)).allocate(anyFloat());
+ asyncClient.stop();
+ }
+
+ @Test (timeout = 5000)
+ public void testCallAMRMClientAsyncStopFromCallbackHandler()
+ throws YarnException, IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ List<ContainerStatus> completed = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ final AllocateResponse response = createAllocateResponse(completed,
+ new ArrayList<Container>(), null);
+
+ when(client.allocate(anyFloat())).thenReturn(response);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ callbackHandler.registerAsyncClient(asyncClient);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ synchronized (callbackHandler.notifier) {
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ while(callbackHandler.stop == false) {
+ try {
+ callbackHandler.notifier.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated,
List<NMToken> nmTokens) {
@@ -323,4 +385,41 @@ public class TestAMRMClientAsync {
}
}
}
+
+ private class TestCallbackHandler2 implements
AMRMClientAsync.CallbackHandler {
+ Object notifier = new Object();
+ @SuppressWarnings("rawtypes")
+ AMRMClientAsync asynClient;
+ boolean stop = false;
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {}
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {}
+
+ @Override
+ public void onShutdownRequest() {}
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ asynClient.stop();
+ stop = true;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ return 0;
+ }
+
+ @Override
+ public void onError(Exception e) {}
+
+ public void registerAsyncClient(
+ AMRMClientAsync<ContainerRequest> asyncClient) {
+ this.asynClient = asyncClient;
+ }
+ }
}