Author: bikas
Date: Thu Mar 21 22:30:10 2013
New Revision: 1459557
URL: http://svn.apache.org/r1459557
Log:
merge -c r1459555 from trunk to branch-2 for YARN-417. Create AMRMClient
wrapper that provides asynchronous callbacks. (Sandy Ryza via bikas)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
- copied unchanged from r1459555,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
- copied unchanged from r1459555,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Mar 21
22:30:10 2013
@@ -33,6 +33,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
hitesh)
+ YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
+ (Sandy Ryza via bikas)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
Thu Mar 21 22:30:10 2013
@@ -63,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -147,8 +147,8 @@ public class ApplicationMaster {
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMClient resourceManager;
-
+ private AMRMClientAsync resourceManager;
+
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@@ -169,8 +169,6 @@ public class ApplicationMaster {
// Priority of the request
private int requestPriority;
- // Simple flag to denote whether all works is done
- private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
@@ -201,6 +199,9 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
+ private volatile boolean done;
+ private volatile boolean success;
+
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
@@ -416,226 +417,202 @@ public class ApplicationMaster {
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
- // Connect to ResourceManager
- resourceManager = new AMRMClientImpl(appAttemptID);
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+
+ resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
- try {
- // Setup local RPC Server to accept status requests directly from clients
- // TODO need to setup a protocol for client to be able to communicate to
- // the RPC server
- // TODO use the rpc port info to register with the RM for the client to
- // send requests to this app master
-
- // Register self with ResourceManager
- RegisterApplicationMasterResponse response = resourceManager
- .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- appMasterTrackingUrl);
- // Dump out information about cluster capability as seen by the
- // resource manager
- int minMem = response.getMinimumResourceCapability().getMemory();
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Min mem capabililty of resources in this cluster " + minMem);
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- // A resource ask has to be atleast the minimum of the capability of the
- // cluster, the value has to be a multiple of the min value and cannot
- // exceed the max.
- // If it is not an exact multiple of min, the RM will allocate to the
- // nearest multiple of min
- if (containerMemory < minMem) {
- LOG.info("Container memory specified below min threshold of cluster."
- + " Using min value." + ", specified=" + containerMemory + ", min="
- + minMem);
- containerMemory = minMem;
- } else if (containerMemory > maxMem) {
- LOG.info("Container memory specified above max threshold of cluster."
- + " Using max value." + ", specified=" + containerMemory + ", max="
- + maxMem);
- containerMemory = maxMem;
- }
-
- // Setup heartbeat emitter
- // TODO poll RM every now and then with an empty request to let RM know
- // that we are alive
- // The heartbeat interval after which an AM is timed out by the RM is
- // defined by a config setting:
- // RM_AM_EXPIRY_INTERVAL_MS with default defined by
- // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
- // The allocate calls to the RM count as heartbeats so, for now,
- // this additional heartbeat emitter is not required.
-
- // Setup ask for containers from RM
- // Send request for containers to RM
- // Until we get our fully allocated quota, we keep on polling RM for
- // containers
- // Keep looping until all the containers are launched and shell script
- // executed on them ( regardless of success/failure).
-
- int loopCounter = -1;
-
- while (numCompletedContainers.get() < numTotalContainers && !appDone) {
- loopCounter++;
-
- // log current state
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // Sleep before each loop when asking RM for containers
- // to avoid flooding RM with spurious requests when it
- // need not have any available containers
- // Sleeping for 1000 ms.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted " + e.getMessage());
- }
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ RegisterApplicationMasterResponse response = resourceManager
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the
+ // cluster, the value has to be a multiple of the min value and cannot
+ // exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the
+ // nearest multiple of min
+ if (containerMemory < minMem) {
+ LOG.info("Container memory specified below min threshold of cluster."
+ + " Using min value." + ", specified=" + containerMemory + ", min="
+ + minMem);
+ containerMemory = minMem;
+ } else if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
- // No. of containers to request
- // For the first loop, askCount will be equal to total containers
needed
- // From that point on, askCount will always be 0 as current
- // implementation does not change its ask on container failures.
- int askCount = numTotalContainers - numRequestedContainers.get();
- numRequestedContainers.addAndGet(askCount);
-
- if (askCount > 0) {
- ContainerRequest containerAsk = setupContainerAskForRM(askCount);
- resourceManager.addContainerRequest(containerAsk);
- }
- // Send the request to RM
- LOG.info("Asking RM for containers" + ", askCount=" + askCount);
- AllocateResponse allocResp = sendContainerAskToRM();
-
- // Retrieve list of allocated containers from the response
- List<Container> allocatedContainers =
- allocResp.getAllocatedContainers();
- LOG.info("Got response from RM for container ask, allocatedCnt="
- + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerState" + allocatedContainer.getState()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
- // + ", containerToken"
- //
+allocatedContainer.getContainerToken().getIdentifier().toString());
-
- LaunchContainerRunnable runnableLaunchContainer = new
LaunchContainerRunnable(
- allocatedContainer);
- Thread launchThread = new Thread(runnableLaunchContainer);
-
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+ resourceManager.addContainerRequest(containerAsk);
+ numRequestedContainers.set(numTotalContainers);
- // Check what the current available resources in the cluster are
- // TODO should we do anything if the available resources are not
enough?
- Resource availableResources = allocResp.getAvailableResources();
- LOG.info("Current available resources in the cluster "
- + availableResources);
-
- // Check the completed containers
- List<ContainerStatus> completedContainers = allocResp
- .getCompletedContainersStatuses();
- LOG.info("Got response from RM for container ask, completedCnt="
- + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
- + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- assert (containerStatus.getState() == ContainerState.COMPLETE);
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- if (-100 != exitStatus) {
- // shell script failed
- // counts as completed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- } else {
- // something else bad happened
- // app job did not complete for some reason
- // we should re-try as the container was lost for some reason
- numAllocatedContainers.decrementAndGet();
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as it would be done
- // by the RM/CM.
- }
- } else {
- // nothing to do
- // container completed successfully
+ while (!done) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+ finish();
+
+ return success;
+ }
+
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnRemoteException ex) {
+ LOG.error("Failed to unregister application", ex);
+ }
+
+ done = true;
+ resourceManager.stop();
+ }
+
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @Override
+ public void onContainersCompleted(List<ContainerStatus>
completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+ // shell script failed
+ // counts as completed
numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId="
- + containerStatus.getContainerId());
+ numFailedContainers.incrementAndGet();
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM
}
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
}
- if (numCompletedContainers.get() == numTotalContainers) {
- appDone = true;
- }
-
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // TODO
- // Add a timeout handling layer
- // for misbehaving shell commands
}
-
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- e.printStackTrace();
- }
+
+ // ask for more containers if any failed
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+ resourceManager.addContainerRequest(containerAsk);
+ }
+
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ resourceManager.setProgress(progress);
+
+ if (numCompletedContainers.get() == numTotalContainers) {
+ done = true;
}
+ }
- // When the application completes, it should send a finish application
- // signal to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinalApplicationStatus appStatus;
- String appMessage = null;
- boolean isSuccess = true;
- if (numFailedContainers.get() == 0) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else {
- appStatus = FinalApplicationStatus.FAILED;
- appMessage = "Diagnostics." + ", total=" + numTotalContainers
- + ", completed=" + numCompletedContainers.get() + ", allocated="
- + numAllocatedContainers.get() + ", failed="
- + numFailedContainers.get();
- isSuccess = false;
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer = new
LaunchContainerRunnable(
+ allocatedContainer);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
}
- resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
- return isSuccess;
- } finally {
- resourceManager.stop();
}
+
+ @Override
+ public void onRebootRequest() {}
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
}
/**
@@ -811,21 +788,4 @@ public class ApplicationMaster {
LOG.info("Requested container ask: " + request.toString());
return request;
}
-
- /**
- * Ask RM to allocate given no. of containers to this Application Master
- *
- * @param requestedContainers Containers to ask for from RM
- * @return Response from RM to AM with allocated containers
- * @throws YarnRemoteException
- */
- private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
- float progressIndicator = (float) numCompletedContainers.get()
- / numTotalContainers;
-
- LOG.info("Sending request to RM for containers" + ", progress="
- + progressIndicator);
-
- return resourceManager.allocate(progressIndicator);
- }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1459557&r1=1459556&r2=1459557&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
Thu Mar 21 22:30:10 2013
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
allocateRequest.addAllReleases(containersToBeReleased);
return allocateRequest;
}
+
+ public static AllocateResponse newAllocateResponse(int responseId,
+ List<ContainerStatus> completedContainers,
+ List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+ Resource availResources, boolean reboot, int numClusterNodes) {
+ AllocateResponse response = recordFactory
+ .newRecordInstance(AllocateResponse.class);
+ response.setNumClusterNodes(numClusterNodes);
+ response.setResponseId(responseId);
+ response.setCompletedContainersStatuses(completedContainers);
+ response.setAllocatedContainers(allocatedContainers);
+ response.setUpdatedNodes(updatedNodes);
+ response.setAvailableResources(availResources);
+ response.setReboot(reboot);
+
+ return response;
+ }
}