Author: zjshen
Date: Sat Mar 22 04:37:46 2014
New Revision: 1580165
URL: http://svn.apache.org/r1580165
Log:
YARN-1577. Made UnmanagedAMLauncher do launchAM after the attempt reaches the
LAUNCHED state. Contributed by Jian He.
svn merge --ignore-ancestry -c 1580164 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1580165&r1=1580164&r2=1580165&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Mar 22
04:37:46 2014
@@ -543,6 +543,9 @@ Release 2.4.0 - UNRELEASED
YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie
Shen via jianhe)
+ YARN-1577. Made UnmanagedAMLauncher do launchAM after the attempt reaches
+ the LAUNCHED state. (Jian He via zjshen)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1580165&r1=1580164&r2=1580165&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
Sat Mar 22 04:37:46 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -77,7 +79,7 @@ public class UnmanagedAMLauncher {
private Configuration conf;
// Handle to talk to the Resource Manager/Applications Manager
- private YarnClient rmClient;
+ protected YarnClient rmClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
@@ -92,6 +94,7 @@ public class UnmanagedAMLauncher {
private volatile boolean amCompleted = false;
+ private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
/**
* @param args
* Command line arguments
@@ -173,12 +176,6 @@ public class UnmanagedAMLauncher {
public void launchAM(ApplicationAttemptId attemptId)
throws IOException, YarnException {
- ApplicationReport report =
- rmClient.getApplicationReport(attemptId.getApplicationId());
- if (report.getYarnApplicationState() != YarnApplicationState.ACCEPTED) {
- throw new YarnException(
- "Umanaged AM must be in ACCEPTED state before launching");
- }
Credentials credentials = new Credentials();
Token<AMRMTokenIdentifier> token =
rmClient.getAMRMToken(attemptId.getApplicationId());
@@ -338,20 +335,27 @@ public class UnmanagedAMLauncher {
// Submit the application to the applications manager
LOG.info("Submitting application to ASM");
rmClient.submitApplication(appContext);
-
- // Monitor the application to wait for launch state
- ApplicationReport appReport = monitorApplication(appId,
- EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId =
appReport.getCurrentApplicationAttemptId();
- LOG.info("Launching application with id: " + attemptId);
-
- // launch AM
- launchAM(attemptId);
-
- // Monitor the application for end state
- appReport = monitorApplication(appId, EnumSet.of(
- YarnApplicationState.KILLED, YarnApplicationState.FAILED,
- YarnApplicationState.FINISHED));
+
+ ApplicationReport appReport =
+ monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED,
+ YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+ YarnApplicationState.FINISHED));
+
+ if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
+ // Monitor the application attempt to wait for launch state
+ ApplicationAttemptReport attemptReport =
+ monitorCurrentAppAttempt(appId,
+ YarnApplicationAttemptState.LAUNCHED);
+ ApplicationAttemptId attemptId =
+ attemptReport.getApplicationAttemptId();
+ LOG.info("Launching AM with application attempt id " + attemptId);
+ // launch AM
+ launchAM(attemptId);
+ // Monitor the application for end state
+ appReport =
+ monitorApplication(appId, EnumSet.of(YarnApplicationState.KILLED,
+ YarnApplicationState.FAILED, YarnApplicationState.FINISHED));
+ }
YarnApplicationState appState = appReport.getYarnApplicationState();
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
@@ -376,6 +380,43 @@ public class UnmanagedAMLauncher {
}
}
+ private ApplicationAttemptReport monitorCurrentAppAttempt(
+ ApplicationId appId, YarnApplicationAttemptState attemptState)
+ throws YarnException, IOException {
+ long startTime = System.currentTimeMillis();
+ ApplicationAttemptId attemptId = null;
+ while (true) {
+ if (attemptId == null) {
+ attemptId =
+ rmClient.getApplicationReport(appId)
+ .getCurrentApplicationAttemptId();
+ }
+ ApplicationAttemptReport attemptReport = null;
+ if (attemptId != null) {
+ attemptReport = rmClient.getApplicationAttemptReport(attemptId);
+ if
(attemptState.equals(attemptReport.getYarnApplicationAttemptState())) {
+ return attemptReport;
+ }
+ }
+ LOG.info("Current attempt state of " + appId + " is " + (attemptReport
== null
+ ? " N/A " : attemptReport.getYarnApplicationAttemptState())
+ + ", waiting for current attempt to reach " + attemptState);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for current attempt of " + appId
+ + " to reach " + attemptState);
+ }
+ if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
+ String errmsg =
+ "Timeout for waiting current attempt of " + appId + " to reach "
+ + attemptState;
+ LOG.error(errmsg);
+ throw new RuntimeException(errmsg);
+ }
+ }
+ }
+
/**
* Monitor the submitted application for completion. Kill application if time
* expires.
@@ -391,7 +432,6 @@ public class UnmanagedAMLauncher {
IOException {
long foundAMCompletedTime = 0;
- final int timeToWaitMS = 10000;
StringBuilder expectedFinalState = new StringBuilder();
boolean first = true;
for (YarnApplicationState state : finalState) {
@@ -438,8 +478,8 @@ public class UnmanagedAMLauncher {
if (foundAMCompletedTime == 0) {
foundAMCompletedTime = System.currentTimeMillis();
} else if ((System.currentTimeMillis() - foundAMCompletedTime)
- > timeToWaitMS) {
- LOG.warn("Waited " + timeToWaitMS/1000
+ > AM_STATE_WAIT_TIMEOUT_MS) {
+ LOG.warn("Waited " + AM_STATE_WAIT_TIMEOUT_MS/1000
+ " seconds after process completed for AppReport"
+ " to reach desired final state. Not waiting anymore."
+ "CurrentState = " + state
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1580165&r1=1580164&r2=1580165&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
Sat Mar 22 04:37:46 2014
@@ -28,8 +28,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -38,11 +36,15 @@ import org.apache.hadoop.yarn.api.Applic
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -122,7 +124,7 @@ public class TestUnmanagedAMLauncher {
}
@Test(timeout=30000)
- public void testDSShell() throws Exception {
+ public void testUMALauncher() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
@@ -141,8 +143,18 @@ public class TestUnmanagedAMLauncher {
+ " success" };
LOG.info("Initializing Launcher");
- UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
- yarnCluster.getConfig()));
+ UnmanagedAMLauncher launcher =
+ new UnmanagedAMLauncher(new Configuration(yarnCluster.getConfig())) {
+ public void launchAM(ApplicationAttemptId attemptId)
+ throws IOException, YarnException {
+ YarnApplicationAttemptState attemptState =
+ rmClient.getApplicationAttemptReport(attemptId)
+ .getYarnApplicationAttemptState();
+ Assert.assertTrue(attemptState
+ .equals(YarnApplicationAttemptState.LAUNCHED));
+ super.launchAM(attemptId);
+ }
+ };
boolean initSuccess = launcher.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running Launcher");
@@ -154,7 +166,7 @@ public class TestUnmanagedAMLauncher {
}
@Test(timeout=30000)
- public void testDSShellError() throws Exception {
+ public void testUMALauncherError() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1580165&r1=1580164&r2=1580165&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Sat Mar 22 04:37:46 2014
@@ -1650,11 +1650,14 @@ public class RMAppAttemptImpl implements
this.readLock.lock();
ApplicationAttemptReport attemptReport = null;
try {
+ // AM container maybe not yet allocated. and also unmangedAM doesn't have
+ // am container.
+ ContainerId amId =
+ masterContainer == null ? null : masterContainer.getId();
attemptReport = ApplicationAttemptReport.newInstance(this
.getAppAttemptId(), this.getHost(), this.getRpcPort(), this
.getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState
- .valueOf(this.getState().toString()), this.getMasterContainer()
- .getId());
+ .valueOf(this.getState().toString()), amId);
} finally {
this.readLock.unlock();
}