Author: vinodkv
Date: Tue Apr 30 05:44:06 2013
New Revision: 1477478
URL: http://svn.apache.org/r1477478
Log:
YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager to
separate out various validation checks depending on whether they rely on RM
configuration or not. Contributed by Zhijie Shen.
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr 30 05:44:06 2013
@@ -197,6 +197,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-591. Moved RM recovery related records out of public API as they do
not
belong there. (vinodkv)
+ YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager
+ to separate out various validation checks depending on whether they rely on
+ RM configuration or not. (Zhijie Shen via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
Tue Apr 30 05:44:06 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -83,15 +81,11 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -266,48 +260,61 @@ public class ClientRMService extends Abs
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
- String user = submissionContext.getAMContainerSpec().getUser();
+
+ // ApplicationSubmissionContext needs to be validated for safety - only
+ // those fields that are independent of the RM's configuration will be
+ // checked here, those that are dependent on RM configuration are validated
+ // in RMAppManager.
+
+ String user = null;
try {
+ // Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
- if (rmContext.getRMApps().get(applicationId) != null) {
- throw new IOException("Application with id " + applicationId
- + " is already present! Cannot add a duplicate!");
- }
-
- // Safety
submissionContext.getAMContainerSpec().setUser(user);
+ } catch (IOException ie) {
+ LOG.warn("Unable to get the current user.", ie);
+ RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+ ie.getMessage(), "ClientRMService",
+ "Exception in submitting application", applicationId);
+ throw RPCUtil.getRemoteException(ie);
+ }
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
- try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw RPCUtil.getRemoteException(e);
- }
- }
+ // Though duplication will checked again when app is put into rmContext,
+ // but it is good to fail the invalid submission as early as possible.
+ if (rmContext.getRMApps().get(applicationId) != null) {
+ String message = "Application with id " + applicationId +
+ " is already present! Cannot add a duplicate!";
+ LOG.warn(message);
+ RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+ message, "ClientRMService", "Exception in submitting application",
+ applicationId);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ if (submissionContext.getQueue() == null) {
+ submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+ if (submissionContext.getApplicationName() == null) {
+ submissionContext.setApplicationName(
+ YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ }
- // This needs to be synchronous as the client can query
- // immediately following the submission to get the application status.
- // So call handle directly and do not send an event.
- rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
- .currentTimeMillis()));
+ try {
+ // call RMAppManager to submit application directly
+ rmAppManager.submitApplication(submissionContext,
+ System.currentTimeMillis(), false);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
- } catch (IOException ie) {
- LOG.info("Exception in submitting application", ie);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- ie.getMessage(), "ClientRMService",
+ } catch (YarnRemoteException e) {
+ LOG.info("Exception in submitting application with id " +
+ applicationId.getId(), e);
+ RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+ e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
- throw RPCUtil.getRemoteException(ie);
+ throw e;
}
SubmitApplicationResponse response = recordFactory
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
Tue Apr 30 05:44:06 2013
@@ -31,8 +31,10 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* This class manages the list of applications for the resource manager.
@@ -233,64 +239,77 @@ public class RMAppManager implements Eve
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered) {
+ boolean isRecovered) throws YarnRemoteException {
ApplicationId applicationId = submissionContext.getApplicationId();
- RMApp application = null;
- try {
- // Sanity checks
- if (submissionContext.getQueue() == null) {
- submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
- }
- if (submissionContext.getApplicationName() == null) {
- submissionContext.setApplicationName(
- YarnConfiguration.DEFAULT_APPLICATION_NAME);
- }
-
- // Create RMApp
- application =
- new RMAppImpl(applicationId, rmContext, this.conf,
- submissionContext.getApplicationName(),
- submissionContext.getAMContainerSpec().getUser(),
- submissionContext.getQueue(),
- submissionContext, this.scheduler, this.masterService,
- submitTime);
-
- // Sanity check - duplicate?
- if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
- null) {
- String message = "Application with id " + applicationId
- + " is already present! Cannot add a duplicate!";
- LOG.info(message);
- throw RPCUtil.getRemoteException(message);
- }
+ // Validation of the ApplicationSubmissionContext needs to be completed
+ // here. Only those fields that are dependent on RM's configuration are
+ // checked here as they have to be validated whether they are part of new
+ // submission or just being recovered.
+
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + applicationId, e);
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
- // Inform the ACLs Manager
- this.applicationACLsManager.addApplication(applicationId,
- submissionContext.getAMContainerSpec().getApplicationACLs());
+ // Create RMApp
+ RMApp application =
+ new RMAppImpl(applicationId, rmContext, this.conf,
+ submissionContext.getApplicationName(),
+ submissionContext.getAMContainerSpec().getUser(),
+ submissionContext.getQueue(),
+ submissionContext, this.scheduler, this.masterService,
+ submitTime);
+
+ // Concurrent app submissions with same applicationId will fail here
+ // Concurrent app submissions with different applicationIds will not
+ // influence each other
+ if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
+ null) {
+ String message = "Application with id " + applicationId
+ + " is already present! Cannot add a duplicate!";
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ // Inform the ACLs Manager
+ this.applicationACLsManager.addApplication(applicationId,
+ submissionContext.getAMContainerSpec().getApplicationACLs());
+
+ try {
// Setup tokens for renewal
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
- }
-
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
- RMAppEventType.START));
+ }
} catch (IOException ie) {
- LOG.info("RMAppManager submit application exception", ie);
- if (application != null) {
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- }
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ ie);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, ie.getMessage()));
+ throw RPCUtil.getRemoteException(ie);
}
+
+ // All done, start the RMApp
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
+ RMAppEventType.START));
}
private Credentials parseCredentials(ApplicationSubmissionContext
application)
@@ -377,14 +396,6 @@ public class RMAppManager implements Eve
checkAppNumCompletedLimit();
}
break;
- case APP_SUBMIT:
- {
- ApplicationSubmissionContext submissionContext =
- ((RMAppManagerSubmitEvent)event).getSubmissionContext();
- long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
- submitApplication(submissionContext, submitTime, false);
- }
- break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
Tue Apr 30 05:44:06 2013
@@ -19,6 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
- APP_SUBMIT,
APP_COMPLETED
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
Tue Apr 30 05:44:06 2013
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@@ -31,12 +34,15 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -46,11 +52,11 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -163,9 +169,10 @@ public class TestAppManager{
super.setCompletedAppsMax(max);
}
public void submitApplication(
- ApplicationSubmissionContext submissionContext) {
- super.submitApplication(
- submissionContext, System.currentTimeMillis(), false);
+ ApplicationSubmissionContext submissionContext)
+ throws YarnRemoteException {
+ super.submitApplication(submissionContext, System.currentTimeMillis(),
+ false);
}
}
@@ -179,6 +186,40 @@ public class TestAppManager{
}
}
+ private RMContext rmContext;
+ private TestRMAppManager appMonitor;
+ private ApplicationSubmissionContext asContext;
+ private ApplicationId appId;
+
+ @Before
+ public void setUp() {
+ long now = System.currentTimeMillis();
+
+ rmContext = mockRMContext(1, now - 10);
+ ResourceScheduler scheduler = mockResourceScheduler();
+ Configuration conf = new Configuration();
+ ApplicationMasterService masterService =
+ new ApplicationMasterService(rmContext, scheduler);
+ appMonitor = new TestRMAppManager(rmContext,
+ new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
+ new ApplicationACLsManager(conf), conf);
+
+ appId = MockApps.newAppID(1);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ asContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ asContext.setApplicationId(appId);
+ asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+ asContext.setResource(mockResource());
+ setupDispatcher(rmContext, conf);
+ }
+
+ @After
+ public void tearDown() {
+ setAppEventType(RMAppEventType.KILL);
+ ((Service)rmContext.getDispatcher()).stop();
+ }
+
@Test
public void testRMAppRetireNone() throws Exception {
long now = System.currentTimeMillis();
@@ -334,38 +375,10 @@ public class TestAppManager{
@Test
public void testRMAppSubmit() throws Exception {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(1);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap<ApplicationAccessType,
String>());
- context.setAMContainerSpec(amContainer);
- setupDispatcher(rmContext, conf);
-
- appMonitor.submitApplication(context);
- RMApp app = rmContext.getRMApps().get(appID);
+ appMonitor.submitApplication(asContext);
+ RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't match",
- YarnConfiguration.DEFAULT_APPLICATION_NAME,
- app.getName());
- Assert.assertEquals("app queue doesn't match",
- YarnConfiguration.DEFAULT_QUEUE_NAME,
- app.getQueue());
+ Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW,
app.getState());
// wait for event to be processed
@@ -374,9 +387,8 @@ public class TestAppManager{
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
- Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
- setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
+ Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
+ getAppEventType());
}
@Test (timeout = 30000)
@@ -390,10 +402,7 @@ public class TestAppManager{
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(0, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
+ ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
@@ -402,21 +411,12 @@ public class TestAppManager{
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
- RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
-
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer.setApplicationACLs(new HashMap<ApplicationAccessType,
String>());
- context.setAMContainerSpec(amContainer);
- setupDispatcher(rmContext, conf);
-
- ApplicationId appID = MockApps.newAppID(1);
- context.setApplicationId(appID);
+ ApplicationId appID = MockApps.newAppID(i * 4 + j + 1);
+ asContext.setApplicationId(appID);
if (individualMaxAppAttempts[i][j] != 0) {
- context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
+ asContext.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
}
- appMonitor.submitApplication(context);
+ appMonitor.submitApplication(asContext);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertEquals("max application attempts doesn't match",
expectedNums[i][j], app.getMaxAppAttempts());
@@ -428,96 +428,73 @@ public class TestAppManager{
Thread.sleep(1000);
}
setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
}
}
}
- @Test (timeout = 3000)
- public void testRMAppSubmitWithQueueAndName() throws Exception {
- long now = System.currentTimeMillis();
-
- RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(10);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- context.setApplicationName("testApp1");
- context.setQueue("testQueue");
- ContainerLaunchContext amContainer = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- amContainer
- .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
- context.setAMContainerSpec(amContainer);
+ @Test (timeout = 30000)
+ public void testRMAppSubmitDuplicateApplicationId() throws Exception {
+ ApplicationId appId = MockApps.newAppID(0);
+ asContext.setApplicationId(appId);
+ RMApp appOrig = rmContext.getRMApps().get(appId);
+ Assert.assertTrue("app name matches but shouldn't", "testApp1" !=
appOrig.getName());
- setupDispatcher(rmContext, conf);
+ // our testApp1 should be rejected and original app with same id should be
left in place
+ try {
+ appMonitor.submitApplication(asContext);
+ Assert.fail("Exception is expected when applicationId is duplicate.");
+ } catch (YarnRemoteException e) {
+ Assert.assertTrue("The thrown exception is not the expectd one.",
+ e.getMessage().contains("Cannot add a duplicate!"));
+ }
- appMonitor.submitApplication(context);
- RMApp app = rmContext.getRMApps().get(appID);
+ // make sure original app didn't get removed
+ RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
- Assert.assertEquals("app queue doesn't match", "testQueue",
app.getQueue());
- Assert.assertEquals("app state doesn't match", RMAppState.NEW,
app.getState());
+ Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
+ Assert.assertEquals("app state doesn't match", RMAppState.FINISHED,
app.getState());
+ }
- // wait for event to be processed
- int timeoutSecs = 0;
- while ((getAppEventType() == RMAppEventType.KILL) &&
- timeoutSecs++ < 20) {
- Thread.sleep(1000);
+ @Test (timeout = 30000)
+ public void testRMAppSubmitInvalidResourceRequest() throws Exception {
+ asContext.setResource(Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1));
+
+ // submit an app
+ try {
+ appMonitor.submitApplication(asContext);
+ Assert.fail("Application submission should fail because resource" +
+ " request is invalid.");
+ } catch (YarnRemoteException e) {
+ // Exception is expected
+ Assert.assertTrue("The thrown exception is not" +
+ " InvalidResourceRequestException",
+ e.getMessage().startsWith("Invalid resource request"));
}
- Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
- setAppEventType(RMAppEventType.KILL);
- ((Service)rmContext.getDispatcher()).stop();
}
- @Test
- public void testRMAppSubmitError() throws Exception {
- long now = System.currentTimeMillis();
-
- // specify 1 here and use same appId below so it gets duplicate entry
- RMContext rmContext = mockRMContext(1, now - 10);
- ResourceScheduler scheduler = new CapacityScheduler();
- Configuration conf = new Configuration();
- ApplicationMasterService masterService =
- new ApplicationMasterService(rmContext, scheduler);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
- new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
- new ApplicationACLsManager(conf), conf);
-
- ApplicationId appID = MockApps.newAppID(0);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- context.setApplicationId(appID);
- context.setApplicationName("testApp1");
- context.setQueue("testQueue");
-
- setupDispatcher(rmContext, conf);
-
- RMApp appOrig = rmContext.getRMApps().get(appID);
- Assert.assertTrue("app name matches but shouldn't", "testApp1" !=
appOrig.getName());
-
- ContainerLaunchContext clc =
- BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
- null, null);
- context.setAMContainerSpec(clc);
- // our testApp1 should be rejected and original app with same id should be
left in place
- appMonitor.submitApplication(context);
-
- // make sure original app didn't get removed
- RMApp app = rmContext.getRMApps().get(appID);
- Assert.assertNotNull("app is null", app);
- Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
- Assert.assertEquals("app name doesn't matches", appOrig.getName(),
app.getName());
- ((Service)rmContext.getDispatcher()).stop();
+ private static ResourceScheduler mockResourceScheduler() {
+ ResourceScheduler scheduler = mock(ResourceScheduler.class);
+ when(scheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(scheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ return scheduler;
+ }
+
+ private static ContainerLaunchContext mockContainerLaunchContext(
+ RecordFactory recordFactory) {
+ ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
+ ContainerLaunchContext.class);
+ amContainer.setApplicationACLs(new HashMap<ApplicationAccessType,
String>());;
+ return amContainer;
+ }
+
+ private static Resource mockResource() {
+ return Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
}
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1477478&r1=1477477&r2=1477478&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
Tue Apr 30 05:44:06 2013
@@ -250,17 +250,70 @@ public class TestClientRMService {
rmContext, null, null, null, dtsm);
rmService.renewDelegationToken(request);
}
+
+ @Test (timeout = 30000)
+ @SuppressWarnings ("rawtypes")
+ public void testAppSubmit() throws Exception {
+ YarnScheduler yarnScheduler = mockYarnScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(yarnScheduler, rmContext);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler<Event>() {
+ public void handle(Event event) {}
+ });
+ ClientRMService rmService =
+ new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+ // without name and queue
+ ApplicationId appId1 = getApplicationId(100);
+ SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app1 = rmContext.getRMApps().get(appId1);
+ Assert.assertNotNull("app doesn't exist", app1);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+ Assert.assertEquals("app queue doesn't match",
+ YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue());
+
+ // with name and queue
+ String name = MockApps.newAppName();
+ String queue = MockApps.newQueue();
+ ApplicationId appId2 = getApplicationId(101);
+ SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, name, queue);
+ try {
+ rmService.submitApplication(submitRequest2);
+ } catch (YarnRemoteException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app2 = rmContext.getRMApps().get(appId2);
+ Assert.assertNotNull("app doesn't exist", app2);
+ Assert.assertEquals("app name doesn't match", name, app2.getName());
+ Assert.assertEquals("app queue doesn't match", queue, app2.getQueue());
+
+ // duplicate appId
+ try {
+ rmService.submitApplication(submitRequest2);
+ Assert.fail("Exception is expected.");
+ } catch (YarnRemoteException e) {
+ Assert.assertTrue("The thrown exception is not expected.",
+ e.getMessage().contains("Cannot add a duplicate!"));
+ }
+ }
@Test(timeout=4000)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
- YarnScheduler yarnScheduler = mock(YarnScheduler.class);
- when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
@@ -270,8 +323,10 @@ public class TestClientRMService {
final ApplicationId appId1 = getApplicationId(100);
final ApplicationId appId2 = getApplicationId(101);
- final SubmitApplicationRequest submitRequest1 =
mockSubmitAppRequest(appId1);
- final SubmitApplicationRequest submitRequest2 =
mockSubmitAppRequest(appId2);
+ final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, null, null);
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@@ -319,61 +374,23 @@ public class TestClientRMService {
t.join();
}
- @Test (timeout = 30000)
- public void testInvalidResourceRequestWhenSubmittingApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- YarnScheduler yarnScheduler = mock(YarnScheduler.class);
- when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- RMContext rmContext = mock(RMContext.class);
- mockRMContext(yarnScheduler, rmContext);
- RMStateStore stateStore = mock(RMStateStore.class);
- when(rmContext.getStateStore()).thenReturn(stateStore);
- RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
- null, mock(ApplicationACLsManager.class), new Configuration());
-
- final ApplicationId appId = getApplicationId(100);
- final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
- Resource resource = Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
- when(submitRequest.getApplicationSubmissionContext()
- .getResource()).thenReturn(resource);
-
- final ClientRMService rmService =
- new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
-
- // submit an app
- try {
- rmService.submitApplication(submitRequest);
- Assert.fail("Application submission should fail because resource" +
- " request is invalid.");
- } catch (YarnRemoteException e) {
- // Exception is expected
- Assert.assertTrue("The thrown exception is not" +
- " InvalidResourceRequestException",
- e.getMessage().startsWith("Invalid resource request"));
- }
- }
-
- private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
+ String name, String queue) {
String user = MockApps.newUserName();
- String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec =
mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
- when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
- when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user);
- when(submissionContext.getQueue()).thenReturn(queue);
- when(submissionContext.getApplicationId()).thenReturn(appId);
- when(submissionContext.getResource()).thenReturn(resource);
+ ApplicationSubmissionContext submissionContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ submissionContext.setAMContainerSpec(amContainerSpec);
+ submissionContext.getAMContainerSpec().setUser(user);
+ submissionContext.setApplicationName(name);
+ submissionContext.setQueue(queue);
+ submissionContext.setApplicationId(appId);
+ submissionContext.setResource(resource);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@@ -429,4 +446,15 @@ public class TestClientRMService {
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis());
}
+
+ private static YarnScheduler mockYarnScheduler() {
+ YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+ when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ return yarnScheduler;
+ }
}