Author: vinodkv
Date: Tue Mar 11 00:52:03 2014
New Revision: 1576160
URL: http://svn.apache.org/r1576160
Log:
YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager
after the submitApplication call goes through. Contributed by Xuan Gong.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1576160&r1=1576159&r2=1576160&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Mar 11 00:52:03 2014
@@ -281,6 +281,9 @@ Release 2.4.0 - UNRELEASED
after getting an application-ID but before submission and can still submit
to
the newly active RM with no issues. (Xuan Gong via vinodkv)
+ YARN-1764. Modified YarnClient to correctly handle failover of
ResourceManager
+ after the submitApplication call goes through. (Xuan Gong via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java?rev=1576160&r1=1576159&r2=1576160&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
Tue Mar 11 00:52:03 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
/**
* <p>The protocol between clients and the <code>ResourceManager</code>
@@ -107,7 +110,16 @@ public interface ApplicationClientProtoc
* {@link SubmitApplicationResponse} on accepting the submission and throws
* an exception if it rejects the submission. However, this call needs to be
* followed by {@link #getApplicationReport(GetApplicationReportRequest)}
- * to make sure that the application gets properly submitted.</p>
+ * to make sure that the application gets properly submitted - obtaining a
+ * {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
+ * that RM 'remembers' this application beyond failover or restart. If RM
+ * failover or RM restart happens before ResourceManager saves the
+ * application's state successfully, the subsequent
+ * {@link #getApplicationReport(GetApplicationReportRequest)} will throw
+ * a {@link ApplicationNotFoundException}. The Clients need to re-submit
+ * the application with the same {@link ApplicationSubmissionContext} when
+ * it encounters the {@link ApplicationNotFoundException} on the
+ * {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access to
* queues etc. before accepting the application submission.</p>
@@ -186,6 +198,7 @@ public interface ApplicationClientProtoc
*/
@Public
@Stable
+ @Idempotent
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request)
throws YarnException, IOException;
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java?rev=1576160&r1=1576159&r2=1576160&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
Tue Mar 11 00:52:03 2014
@@ -29,6 +29,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
-import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -84,15 +87,28 @@ public abstract class YarnClient extends
/**
* <p>
- * Submit a new application to <code>YARN.</code> It is a blocking call, such
- * that it will not return {@link ApplicationId} until the submitted
- * application has been submitted and accepted by the ResourceManager.
+ * Submit a new application to <code>YARN.</code> It is a blocking call - it
+ * will not return {@link ApplicationId} until the submitted application is
+ * submitted successfully and accepted by the ResourceManager.
* </p>
*
* <p>
- * Should provide an {@link ApplicationId} when submits a new application,
- * otherwise, it will throw the {@link ApplicationIdNotProvidedException}
- * </p>
+ * Users should provide an {@link ApplicationId} as part of the parameter
+ * {@link ApplicationSubmissionContext} when submitting a new application,
+ * otherwise it will throw the {@link ApplicationIdNotProvidedException}.
+ * </p>
+ *
+ * <p>This internally calls {@link
ApplicationClientProtocol#submitApplication
+ * (SubmitApplicationRequest)}, and after that, it internally invokes
+ * {@link ApplicationClientProtocol#getApplicationReport
+ * (GetApplicationReportRequest)} and waits till it can make sure that the
+ * application gets properly submitted. If RM fails over or RM restart
+ * happens before ResourceManager saves the application's state,
+ * {@link ApplicationClientProtocol
+ * #getApplicationReport(GetApplicationReportRequest)} will throw
+ * the {@link ApplicationNotFoundException}. This API automatically resubmits
+ * the application with the same {@link ApplicationSubmissionContext} when it
+ * catches the {@link ApplicationNotFoundException}</p>
*
* @param appContext
* {@link ApplicationSubmissionContext} containing all the details
@@ -102,8 +118,9 @@ public abstract class YarnClient extends
* @throws IOException
* @see #createApplication()
*/
- public abstract ApplicationId submitApplication(ApplicationSubmissionContext
appContext)
- throws YarnException, IOException;
+ public abstract ApplicationId submitApplication(
+ ApplicationSubmissionContext appContext) throws YarnException,
+ IOException;
/**
* <p>
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1576160&r1=1576159&r2=1576160&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
Tue Mar 11 00:52:03 2014
@@ -187,35 +187,43 @@ public class YarnClientImpl extends Yarn
int pollCount = 0;
long startTime = System.currentTimeMillis();
- //TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
while (true) {
- YarnApplicationState state =
- getApplicationReport(applicationId).getYarnApplicationState();
- if (!state.equals(YarnApplicationState.NEW) &&
- !state.equals(YarnApplicationState.NEW_SAVING)) {
- LOG.info("Submitted application " + applicationId);
- break;
- }
+ try {
+ YarnApplicationState state =
+ getApplicationReport(applicationId).getYarnApplicationState();
+ if (!state.equals(YarnApplicationState.NEW) &&
+ !state.equals(YarnApplicationState.NEW_SAVING)) {
+ LOG.info("Submitted application " + applicationId);
+ break;
+ }
- long elapsedMillis = System.currentTimeMillis() - startTime;
- if (enforceAsyncAPITimeout() &&
- elapsedMillis >= asyncApiPollTimeoutMillis) {
- throw new YarnException("Timed out while waiting for application " +
- applicationId + " to be submitted successfully");
- }
+ long elapsedMillis = System.currentTimeMillis() - startTime;
+ if (enforceAsyncAPITimeout() &&
+ elapsedMillis >= asyncApiPollTimeoutMillis) {
+ throw new YarnException("Timed out while waiting for application " +
+ applicationId + " to be submitted successfully");
+ }
- // Notify the client through the log every 10 poll, in case the client
- // is blocked here too long.
- if (++pollCount % 10 == 0) {
- LOG.info("Application submission is not finished, " +
- "submitted application " + applicationId +
- " is still in " + state);
- }
- try {
- Thread.sleep(submitPollIntervalMillis);
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while waiting for application " + applicationId
- + " to be successfully submitted.");
+ // Notify the client through the log every 10 poll, in case the client
+ // is blocked here too long.
+ if (++pollCount % 10 == 0) {
+ LOG.info("Application submission is not finished, " +
+ "submitted application " + applicationId +
+ " is still in " + state);
+ }
+ try {
+ Thread.sleep(submitPollIntervalMillis);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while waiting for application "
+ + applicationId
+ + " to be successfully submitted.");
+ }
+ } catch (ApplicationNotFoundException ex) {
+ // FailOver or RM restart happens before RMStateStore saves
+ // ApplicationState
+ LOG.info("Re-submit application " + applicationId + "with the " +
+ "same ApplicationSubmissionContext");
+ rmClient.submitApplication(request);
}
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java?rev=1576160&r1=1576159&r2=1576160&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
Tue Mar 11 00:52:03 2014
@@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.junit.Test;
@@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRM
count++;
}
// Verify submittion is successful
- Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
- .getYarnApplicationState() == YarnApplicationState.NEW);
- Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
- .getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
+ YarnApplicationState state =
+ rm.getApplicationReport(app.getApplicationId())
+ .getYarnApplicationState();
+ Assert.assertTrue(state == YarnApplicationState.ACCEPTED
+ || state == YarnApplicationState.SUBMITTED);
Assert.assertEquals(expectedAppId, app.getApplicationId());
}
+
+ // There are two scenarios when RM failover happens
+ // after SubmitApplication Call:
+ // 1) RMStateStore already saved the ApplicationState when failover happens
+ // 2) RMStateStore did not save the ApplicationState when failover happens
+
+ @Test
+ public void
+ testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState()
+ throws Exception {
+ // Test scenario 1 when RM failover happens
+ // after SubmitApplication Call:
+ // RMStateStore already saved the ApplicationState when failover happens
+ startRMs();
+
+ // Submit Application
+ // After submission, the applicationState will be saved in RMStateStore.
+ RMApp app0 = rm1.submitApp(200);
+
+ // Do the failover
+ explicitFailover();
+
+ // Since the applicationState has already been saved in RMStateStore
+ // before failover happens, the current active rm can load the previous
+ // applicationState.
+ ApplicationReport appReport =
+ rm2.getApplicationReport(app0.getApplicationId());
+
+ // verify previous submission is successful.
+ Assert.assertTrue(appReport.getYarnApplicationState()
+ == YarnApplicationState.ACCEPTED ||
+ appReport.getYarnApplicationState()
+ == YarnApplicationState.SUBMITTED);
+ }
+
+ @Test
+ public void
+ testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState()
+ throws Exception {
+ // Test scenario 2 when RM failover happens
+ // after SubmitApplication Call:
+ // RMStateStore did not save the ApplicationState when failover happens.
+ // Using customized RMAppManager.
+ startRMsWithCustomizedRMAppManager();
+
+ // Submit Application
+ // After submission, the applicationState will
+ // not be saved in RMStateStore
+ RMApp app0 =
+ rm1.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false);
+
+ // Do the failover
+ explicitFailover();
+
+ // Since the applicationState is not saved in RMStateStore
+ // when failover happens. The current active RM can not load
+ // previous applicationState.
+ // Expect ApplicationNotFoundException by calling getApplicationReport().
+ try {
+ rm2.getApplicationReport(app0.getApplicationId());
+ Assert.fail("Should get ApplicationNotFoundException here");
+ } catch (ApplicationNotFoundException ex) {
+ // expected ApplicationNotFoundException
+ }
+
+ // Submit the application with previous ApplicationId to current active RM
+ // This will mimic the similar behavior of YarnClient which will re-submit
+ // Application with previous applicationId
+ // when catches the ApplicationNotFoundException
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, app0.getApplicationId());
+
+ verifySubmitApp(rm2, app1, app0.getApplicationId());
+ }
+
+ /**
+ * Test multiple calls of getApplicationReport, to make sure
+ * it is idempotent
+ */
+ @Test
+ public void testGetApplicationReportIdempotent() throws Exception{
+ // start two RMs, and transit rm1 to active, rm2 to standby
+ startRMs();
+
+ // Submit Application
+ // After submission, the applicationState will be saved in RMStateStore.
+ RMApp app = rm1.submitApp(200);
+
+ ApplicationReport appReport1 =
+ rm1.getApplicationReport(app.getApplicationId());
+ Assert.assertTrue(appReport1.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED ||
+ appReport1.getYarnApplicationState() ==
+ YarnApplicationState.SUBMITTED);
+
+ // call getApplicationReport again
+ ApplicationReport appReport2 =
+ rm1.getApplicationReport(app.getApplicationId());
+ Assert.assertEquals(appReport1.getApplicationId(),
+ appReport2.getApplicationId());
+ Assert.assertEquals(appReport1.getYarnApplicationState(),
+ appReport2.getYarnApplicationState());
+
+ // Do the failover
+ explicitFailover();
+
+ // call getApplicationReport
+ ApplicationReport appReport3 =
+ rm2.getApplicationReport(app.getApplicationId());
+ Assert.assertEquals(appReport1.getApplicationId(),
+ appReport3.getApplicationId());
+ Assert.assertEquals(appReport1.getYarnApplicationState(),
+ appReport3.getYarnApplicationState());
+
+ // call getApplicationReport again
+ ApplicationReport appReport4 =
+ rm2.getApplicationReport(app.getApplicationId());
+ Assert.assertEquals(appReport3.getApplicationId(),
+ appReport4.getApplicationId());
+ Assert.assertEquals(appReport3.getYarnApplicationState(),
+ appReport4.getYarnApplicationState());
+ }
}