Author: vinodkv
Date: Tue Mar 11 00:52:48 2014
New Revision: 1576163

URL: http://svn.apache.org/r1576163
Log:
YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager 
after the submitApplication call goes through. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1576160 ../../trunk/

Modified:
    hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
    
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
    
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
    
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java

Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt?rev=1576163&r1=1576162&r2=1576163&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt Tue Mar 
11 00:52:48 2014
@@ -234,6 +234,9 @@ Release 2.4.0 - UNRELEASED
     after getting an application-ID but before submission and can still submit 
to
     the newly active RM with no issues. (Xuan Gong via vinodkv)
 
+    YARN-1764. Modified YarnClient to correctly handle failover of 
ResourceManager
+    after the submitApplication call goes through. (Xuan Gong via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java?rev=1576163&r1=1576162&r2=1576163&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
 (original)
+++ 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
 Tue Mar 11 00:52:48 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 
 /**
  * <p>The protocol between clients and the <code>ResourceManager</code>
@@ -107,7 +110,16 @@ public interface ApplicationClientProtoc
    * {@link SubmitApplicationResponse} on accepting the submission and throws 
    * an exception if it rejects the submission. However, this call needs to be
    * followed by {@link #getApplicationReport(GetApplicationReportRequest)}
-   * to make sure that the application gets properly submitted.</p>
+   * to make sure that the application gets properly submitted - obtaining a
+   * {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
+   * that RM 'remembers' this application beyond failover or restart. If RM
+   * failover or RM restart happens before ResourceManager saves the
+   * application's state successfully, the subsequent
+   * {@link #getApplicationReport(GetApplicationReportRequest)} will throw
+   * a {@link ApplicationNotFoundException}. The Clients need to re-submit
+   * the application with the same {@link ApplicationSubmissionContext} when
+   * it encounters the {@link ApplicationNotFoundException} on the
+   * {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
    * 
    * <p> In secure mode,the <code>ResourceManager</code> verifies access to
    * queues etc. before accepting the application submission.</p>
@@ -186,6 +198,7 @@ public interface ApplicationClientProtoc
    */
   @Public
   @Stable
+  @Idempotent
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) 
   throws YarnException, IOException;

Modified: 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java?rev=1576163&r1=1576162&r2=1576163&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
 (original)
+++ 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
 Tue Mar 11 00:52:48 2014
@@ -29,6 +29,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
-import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
@@ -84,15 +87,28 @@ public abstract class YarnClient extends
 
   /**
    * <p>
-   * Submit a new application to <code>YARN.</code> It is a blocking call, such
-   * that it will not return {@link ApplicationId} until the submitted
-   * application has been submitted and accepted by the ResourceManager.
+   * Submit a new application to <code>YARN.</code> It is a blocking call - it
+   * will not return {@link ApplicationId} until the submitted application is
+   * submitted successfully and accepted by the ResourceManager.
    * </p>
    * 
    * <p>
-   * Should provide an {@link ApplicationId} when submits a new application,
-   * otherwise, it will throw the {@link ApplicationIdNotProvidedException}
-   * </p>
+   * Users should provide an {@link ApplicationId} as part of the parameter
+   * {@link ApplicationSubmissionContext} when submitting a new application,
+   * otherwise it will throw the {@link ApplicationIdNotProvidedException}.
+   * </p>
+   *
+   * <p>This internally calls {@link 
ApplicationClientProtocol#submitApplication
+   * (SubmitApplicationRequest)}, and after that, it internally invokes
+   * {@link ApplicationClientProtocol#getApplicationReport
+   * (GetApplicationReportRequest)} and waits till it can make sure that the
+   * application gets properly submitted. If RM fails over or RM restart
+   * happens before ResourceManager saves the application's state,
+   * {@link ApplicationClientProtocol
+   * #getApplicationReport(GetApplicationReportRequest)} will throw
+   * the {@link ApplicationNotFoundException}. This API automatically resubmits
+   * the application with the same {@link ApplicationSubmissionContext} when it
+   * catches the {@link ApplicationNotFoundException}</p>
    *
    * @param appContext
    *          {@link ApplicationSubmissionContext} containing all the details
@@ -102,8 +118,9 @@ public abstract class YarnClient extends
    * @throws IOException
    * @see #createApplication()
    */
-  public abstract ApplicationId submitApplication(ApplicationSubmissionContext 
appContext)
-      throws YarnException, IOException;
+  public abstract ApplicationId submitApplication(
+      ApplicationSubmissionContext appContext) throws YarnException,
+      IOException;
 
   /**
    * <p>

Modified: 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1576163&r1=1576162&r2=1576163&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
 Tue Mar 11 00:52:48 2014
@@ -187,35 +187,43 @@ public class YarnClientImpl extends Yarn
     int pollCount = 0;
     long startTime = System.currentTimeMillis();
 
-    //TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
     while (true) {
-      YarnApplicationState state =
-          getApplicationReport(applicationId).getYarnApplicationState();
-      if (!state.equals(YarnApplicationState.NEW) &&
-          !state.equals(YarnApplicationState.NEW_SAVING)) {
-        LOG.info("Submitted application " + applicationId);
-        break;
-      }
+      try {
+        YarnApplicationState state =
+            getApplicationReport(applicationId).getYarnApplicationState();
+        if (!state.equals(YarnApplicationState.NEW) &&
+            !state.equals(YarnApplicationState.NEW_SAVING)) {
+          LOG.info("Submitted application " + applicationId);
+          break;
+        }
 
-      long elapsedMillis = System.currentTimeMillis() - startTime;
-      if (enforceAsyncAPITimeout() &&
-          elapsedMillis >= asyncApiPollTimeoutMillis) {
-        throw new YarnException("Timed out while waiting for application " +
-          applicationId + " to be submitted successfully");
-      }
+        long elapsedMillis = System.currentTimeMillis() - startTime;
+        if (enforceAsyncAPITimeout() &&
+            elapsedMillis >= asyncApiPollTimeoutMillis) {
+          throw new YarnException("Timed out while waiting for application " +
+              applicationId + " to be submitted successfully");
+        }
 
-      // Notify the client through the log every 10 poll, in case the client
-      // is blocked here too long.
-      if (++pollCount % 10 == 0) {
-        LOG.info("Application submission is not finished, " +
-            "submitted application " + applicationId +
-            " is still in " + state);
-      }
-      try {
-        Thread.sleep(submitPollIntervalMillis);
-      } catch (InterruptedException ie) {
-        LOG.error("Interrupted while waiting for application " + applicationId
-            + " to be successfully submitted.");
+        // Notify the client through the log every 10 poll, in case the client
+        // is blocked here too long.
+        if (++pollCount % 10 == 0) {
+          LOG.info("Application submission is not finished, " +
+              "submitted application " + applicationId +
+              " is still in " + state);
+        }
+        try {
+          Thread.sleep(submitPollIntervalMillis);
+        } catch (InterruptedException ie) {
+          LOG.error("Interrupted while waiting for application "
+              + applicationId
+              + " to be successfully submitted.");
+        }
+      } catch (ApplicationNotFoundException ex) {
+        // FailOver or RM restart happens before RMStateStore saves
+        // ApplicationState
+        LOG.info("Re-submit application " + applicationId + "with the " +
+            "same ApplicationSubmissionContext");
+        rmClient.submitApplication(request);
       }
     }
 

Modified: 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java?rev=1576163&r1=1576162&r2=1576163&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
 (original)
+++ 
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
 Tue Mar 11 00:52:48 2014
@@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.junit.Test;
 
@@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRM
       count++;
     }
     // Verify submittion is successful
-    Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
-        .getYarnApplicationState() == YarnApplicationState.NEW);
-    Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
-        .getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
+    YarnApplicationState state =
+        rm.getApplicationReport(app.getApplicationId())
+            .getYarnApplicationState();
+    Assert.assertTrue(state == YarnApplicationState.ACCEPTED
+        || state == YarnApplicationState.SUBMITTED);
     Assert.assertEquals(expectedAppId, app.getApplicationId());
   }
+
+  // There are two scenarios when RM failover happens
+  // after SubmitApplication Call:
+  // 1) RMStateStore already saved the ApplicationState when failover happens
+  // 2) RMStateStore did not save the ApplicationState when failover happens
+
+  @Test
+  public void
+      testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState()
+          throws Exception {
+    // Test scenario 1 when RM failover happens
+    // after SubmitApplication Call:
+    // RMStateStore already saved the ApplicationState when failover happens
+    startRMs();
+
+    // Submit Application
+    // After submission, the applicationState will be saved in RMStateStore.
+    RMApp app0 = rm1.submitApp(200);
+
+    // Do the failover
+    explicitFailover();
+
+    // Since the applicationState has already been saved in RMStateStore
+    // before failover happens, the current active rm can load the previous
+    // applicationState.
+    ApplicationReport appReport =
+        rm2.getApplicationReport(app0.getApplicationId());
+
+    // verify previous submission is successful.
+    Assert.assertTrue(appReport.getYarnApplicationState()
+        == YarnApplicationState.ACCEPTED ||
+        appReport.getYarnApplicationState()
+        == YarnApplicationState.SUBMITTED);
+  }
+
+  @Test
+  public void
+      testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState()
+          throws Exception {
+    // Test scenario 2 when RM failover happens
+    // after SubmitApplication Call:
+    // RMStateStore did not save the ApplicationState when failover happens.
+    // Using customized RMAppManager.
+    startRMsWithCustomizedRMAppManager();
+
+    // Submit Application
+    // After submission, the applicationState will
+    // not be saved in RMStateStore
+    RMApp app0 =
+        rm1.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false);
+
+    // Do the failover
+    explicitFailover();
+
+    // Since the applicationState is not saved in RMStateStore
+    // when failover happens. The current active RM can not load
+    // previous applicationState.
+    // Expect ApplicationNotFoundException by calling getApplicationReport().
+    try {
+      rm2.getApplicationReport(app0.getApplicationId());
+      Assert.fail("Should get ApplicationNotFoundException here");
+    } catch (ApplicationNotFoundException ex) {
+      // expected ApplicationNotFoundException
+    }
+
+    // Submit the application with previous ApplicationId to current active RM
+    // This will mimic the similar behavior of YarnClient which will re-submit
+    // Application with previous applicationId
+    // when catches the ApplicationNotFoundException
+    RMApp app1 =
+        rm2.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false, true, app0.getApplicationId());
+
+    verifySubmitApp(rm2, app1, app0.getApplicationId());
+  }
+
+  /**
+   * Test multiple calls of getApplicationReport, to make sure
+   * it is idempotent
+   */
+  @Test
+  public void testGetApplicationReportIdempotent() throws Exception{
+    // start two RMs, and transit rm1 to active, rm2 to standby
+    startRMs();
+
+    // Submit Application
+    // After submission, the applicationState will be saved in RMStateStore.
+    RMApp app = rm1.submitApp(200);
+
+    ApplicationReport appReport1 =
+        rm1.getApplicationReport(app.getApplicationId());
+    Assert.assertTrue(appReport1.getYarnApplicationState() ==
+        YarnApplicationState.ACCEPTED ||
+        appReport1.getYarnApplicationState() ==
+        YarnApplicationState.SUBMITTED);
+
+    // call getApplicationReport again
+    ApplicationReport appReport2 =
+        rm1.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport1.getApplicationId(),
+        appReport2.getApplicationId());
+    Assert.assertEquals(appReport1.getYarnApplicationState(),
+        appReport2.getYarnApplicationState());
+
+    // Do the failover
+    explicitFailover();
+
+    // call getApplicationReport
+    ApplicationReport appReport3 =
+        rm2.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport1.getApplicationId(),
+        appReport3.getApplicationId());
+    Assert.assertEquals(appReport1.getYarnApplicationState(),
+        appReport3.getYarnApplicationState());
+
+    // call getApplicationReport again
+    ApplicationReport appReport4 =
+        rm2.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport3.getApplicationId(),
+        appReport4.getApplicationId());
+    Assert.assertEquals(appReport3.getYarnApplicationState(),
+        appReport4.getYarnApplicationState());
+  }
 }


Reply via email to