Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Mon Dec 9 22:52:02 2013 @@ -29,11 +29,13 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable @@ -105,6 +107,17 @@ public class FSLeafQueue extends FSQueue public List<AppSchedulable> getNonRunnableAppSchedulables() { return nonRunnableAppScheds; } + + @Override + public void collectSchedulerApplications( + Collection<ApplicationAttemptId> apps) { + for (AppSchedulable appSched : runnableAppScheds) { + apps.add(appSched.getApp().getApplicationAttemptId()); + } + for (AppSchedulable appSched : nonRunnableAppScheds) { + apps.add(appSched.getApp().getApplicationAttemptId()); + } + } @Override public void setPolicy(SchedulingPolicy policy)
Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Mon Dec 9 22:52:02 2013 @@ -28,10 +28,12 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable @@ -184,4 +186,12 @@ public class FSParentQueue extends FSQue public int getNumRunnableApps() { return runnableApps; } + + @Override + public void collectSchedulerApplications( + Collection<ApplicationAttemptId> apps) { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Dec 9 22:52:02 2013 @@ -24,6 +24,7 @@ import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -158,7 +159,14 @@ public abstract class FSQueue extends Sc * Gets the children of this queue, if any. */ public abstract Collection<FSQueue> getChildQueues(); - + + /** + * Adds all applications in the queue and its subqueues to the given collection. + * @param apps the collection to add the applications to + */ + public abstract void collectSchedulerApplications( + Collection<ApplicationAttemptId> apps); + /** * Return the number of apps for which containers can be allocated. * Includes apps in subqueues. Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Dec 9 22:52:02 2013 @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -1267,4 +1268,15 @@ public class FairScheduler implements Re } } + @Override + public List<ApplicationAttemptId> getAppsInQueue(String queueName) { + FSQueue queue = queueMgr.getQueue(queueName); + if (queue == null) { + return null; + } + List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(); + queue.collectSchedulerApplications(apps); + return apps; + } + } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Dec 9 22:52:02 2013 @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -850,5 +851,19 @@ public class FifoScheduler implements Re QueueACL acl, String queueName) { return DEFAULT_QUEUE.hasAccess(acl, callerUGI); } + + @Override + public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) { + if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { + List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>( + applications.size()); + for (FiCaSchedulerApp app : applications.values()) { + apps.add(app.getApplicationAttemptId()); + } + return apps; + } else { + return null; + } + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Mon Dec 9 22:52:02 2013 @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.util.HashMap; import java.util.List; @@ -43,6 +47,7 @@ import org.apache.hadoop.yarn.event.Even import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -99,7 +104,7 @@ public class TestAppManager{ rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( rmDispatcher); - return new RMContextImpl(rmDispatcher, + RMContext context = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, null, null, null, null) { @Override @@ -107,6 +112,8 @@ public class TestAppManager{ return map; } }; + ((RMContextImpl)context).setStateStore(mock(RMStateStore.class)); + return context; } public class TestAppManagerDispatcher implements @@ -142,7 +149,6 @@ public class TestAppManager{ public TestRMAppManager(RMContext context, Configuration conf) { super(context, null, null, new ApplicationACLsManager(conf), conf); - setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS); } public TestRMAppManager(RMContext context, @@ -150,7 +156,6 @@ public class TestAppManager{ YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationACLsManager applicationACLsManager, Configuration conf) { super(context, scheduler, masterService, applicationACLsManager, conf); - setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS); } public void checkAppNumCompletedLimit() { @@ -164,9 +169,8 @@ public class TestAppManager{ public int getCompletedAppsListSize() { return super.getCompletedAppsListSize(); } - - public void setCompletedAppsMax(int max) { - super.setCompletedAppsMax(max); + public int getCompletedAppsInStateStore() { + return this.completedAppsInStateStore; } public void submitApplication( ApplicationSubmissionContext submissionContext, String user) @@ -227,9 +231,9 @@ public class TestAppManager{ // Create such that none of the applications will retire since // haven't hit max # RMContext rmContext = mockRMContext(10, now - 10); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - - appMonitor.setCompletedAppsMax(10); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf); Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit", 10, rmContext.getRMApps().size()); @@ -243,6 +247,8 @@ public class TestAppManager{ rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 10, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), never()).removeApplication( + isA(RMApp.class)); } @Test @@ -250,9 +256,10 @@ public class TestAppManager{ long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - - appMonitor.setCompletedAppsMax(3); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); Assert.assertEquals("Number of apps incorrect before", 10, rmContext .getRMApps().size()); @@ -266,6 +273,8 @@ public class TestAppManager{ rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 3, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(7)).removeApplication( + isA(RMApp.class)); } @Test @@ -274,14 +283,17 @@ public class TestAppManager{ // these parameters don't matter, override applications below RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); - appMonitor.setCompletedAppsMax(2); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); // clear out applications map rmContext.getRMApps().clear(); Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size()); + // 6 applications are in final state, 4 are not in final state. // / set with various finished states RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED); rmContext.getRMApps().put(app.getApplicationId(), app); @@ -318,7 +330,9 @@ public class TestAppManager{ rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 2, appMonitor.getCompletedAppsListSize()); - + // 6 applications in final state, 4 of them are removed + verify(rmContext.getStateStore(), times(4)).removeApplication( + isA(RMApp.class)); } @Test @@ -342,14 +356,13 @@ public class TestAppManager{ long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); Assert.assertEquals("Number of apps incorrect before", 10, rmContext .getRMApps().size()); - // test with 0 - appMonitor.setCompletedAppsMax(0); - addToCompletedApps(appMonitor, rmContext); Assert.assertEquals("Number of completed apps incorrect", 10, appMonitor.getCompletedAppsListSize()); @@ -360,6 +373,64 @@ public class TestAppManager{ rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 0, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(10)).removeApplication( + isA(RMApp.class)); + } + + @Test + public void testStateStoreAppLimitLessThanMemoryAppLimit() { + long now = System.currentTimeMillis(); + RMContext rmContext = mockRMContext(10, now - 20000); + Configuration conf = new YarnConfiguration(); + int maxAppsInMemory = 8; + int maxAppsInStateStore = 4; + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, + maxAppsInStateStore); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", 10, + appMonitor.getCompletedAppsListSize()); + appMonitor.checkAppNumCompletedLimit(); + + Assert.assertEquals("Number of apps incorrect after # completed check", + maxAppsInMemory, rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", + maxAppsInMemory, appMonitor.getCompletedAppsListSize()); + + int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore; + verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore)) + .removeApplication(isA(RMApp.class)); + Assert.assertEquals(maxAppsInStateStore, + appMonitor.getCompletedAppsInStateStore()); + } + + @Test + public void testStateStoreAppLimitLargerThanMemoryAppLimit() { + long now = System.currentTimeMillis(); + RMContext rmContext = mockRMContext(10, now - 20000); + Configuration conf = new YarnConfiguration(); + int maxAppsInMemory = 8; + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory); + // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS. + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", 10, + appMonitor.getCompletedAppsListSize()); + appMonitor.checkAppNumCompletedLimit(); + + int numRemoveApps = 10 - maxAppsInMemory; + Assert.assertEquals("Number of apps incorrect after # completed check", + maxAppsInMemory, rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", + maxAppsInMemory, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication( + isA(RMApp.class)); + Assert.assertEquals(maxAppsInMemory, + appMonitor.getCompletedAppsInStateStore()); } protected void setupDispatcher(RMContext rmContext, Configuration conf) { Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Mon Dec 9 22:52:02 2013 @@ -30,9 +30,12 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; @@ -106,6 +109,9 @@ public class TestClientRMService { private static RMDelegationTokenSecretManager dtsm; + private final static String QUEUE_1 = "Q-1"; + private final static String QUEUE_2 = "Q-2"; + @BeforeClass public static void setupSecretManager() throws IOException { RMContext rmContext = mock(RMContext.class); @@ -438,7 +444,7 @@ public class TestClientRMService { mockAclsManager, mockQueueACLsManager, null); // Initialize appnames and queues - String[] queues = {"Q-1", "Q-2"}; + String[] queues = {QUEUE_1, QUEUE_2}; String[] appNames = {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; ApplicationId[] appIds = @@ -596,6 +602,8 @@ public class TestClientRMService { ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); + when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( + getSchedulerApps(apps)); } private ConcurrentHashMap<ApplicationId, RMApp> getRMApps( @@ -614,10 +622,23 @@ public class TestClientRMService { config, "testqueue")); return apps; } + + private List<ApplicationAttemptId> getSchedulerApps( + Map<ApplicationId, RMApp> apps) { + List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>(); + // Return app IDs for the apps in testqueue (as defined in getRMApps) + schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0)); + schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0)); + return schedApps; + } - private ApplicationId getApplicationId(int id) { + private static ApplicationId getApplicationId(int id) { return ApplicationId.newInstance(123456, id); } + + private static ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, ApplicationId applicationId3, YarnConfiguration config, String queueName) { @@ -641,6 +662,10 @@ public class TestClientRMService { when(yarnScheduler.getMaximumResourceCapability()).thenReturn( Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn( + Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); + when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn( + Arrays.asList(getApplicationAttemptId(103))); return yarnScheduler; } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Dec 9 22:52:02 2013 @@ -34,7 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -46,7 +45,7 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -68,9 +67,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -80,6 +76,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -93,7 +90,6 @@ import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mortbay.log.Log; public class TestRMRestart { @@ -106,7 +102,6 @@ public class TestRMRestart { public void setup() throws UnknownHostException { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); conf = new YarnConfiguration(); UserGroupInformation.setConfiguration(conf); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); @@ -423,6 +418,8 @@ public class TestRMRestart { rm2.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + rm1.stop(); + rm2.stop(); } @Test @@ -629,6 +626,8 @@ public class TestRMRestart { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. + rm1.stop(); + rm2.stop(); } @Test @@ -675,6 +674,48 @@ public class TestRMRestart { ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), appReport.getDiagnostics()); + rm1.stop(); + rm2.stop(); + } + + @Test + public void testRMRestartKilledAppWithNoAttempts() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized void storeApplicationAttemptStateInternal( + String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + // ignore attempt saving request. + } + + @Override + public synchronized void updateApplicationAttemptStateInternal( + String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + // ignore attempt saving request. + } + }; + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + // create app + RMApp app0 = + rm1.submitApp(200, "name", "user", + new HashMap<ApplicationAccessType, String>(), false, "default", -1, + null, "MAPREDUCE", false); + // kill the app. + rm1.killApp(app0.getApplicationId()); + rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED); + + // restart rm + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0); } @Test @@ -724,6 +765,9 @@ public class TestRMRestart { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); + + rm1.stop(); + rm2.stop(); } @Test @@ -817,6 +861,9 @@ public class TestRMRestart { // check application summary is logged for the completed apps after RM restart. verify(rm2.getRMAppManager(), times(3)).logApplicationSummary( isA(ApplicationId.class)); + + rm1.stop(); + rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1378,6 +1425,75 @@ public class TestRMRestart { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + @Test + public void testFinishedAppRemovalAfterRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); + memStore.init(conf); + RMState rmState = memStore.getState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + finishApplicationMaster(app0, rm1, nm1, am0); + + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1 = rm2.registerNode("127.0.0.1:1234", 15120); + + Map<ApplicationId, ApplicationState> rmAppState = + rmState.getApplicationState(); + + // app0 exits in both state store and rmContext + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getState()); + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + + // create one more app and finish the app. + RMApp app1 = rm2.submitApp(200); + MockAM am1 = launchAM(app1, rm2, nm1); + finishApplicationMaster(app1, rm2, nm1, am1); + + // the first app0 get kicked out from both rmContext and state store + Assert.assertNull(rm2.getRMContext().getRMApps() + .get(app0.getApplicationId())); + Assert.assertNull(rmAppState.get(app0.getApplicationId())); + + rm1.stop(); + rm2.stop(); + } + + // This is to test RM does not get hang on shutdown. + @Test (timeout = 10000) + public void testRMShutdown() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized void checkVersion() + throws Exception { + throw new Exception("Invalid version."); + } + }; + // start RM + memStore.init(conf); + MockRM rm1 = null; + try { + rm1 = new MockRM(conf, memStore); + rm1.start(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Invalid version.")); + } + Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Dec 9 22:52:02 2013 @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -109,6 +110,7 @@ public class RMStateStoreTestBase extend boolean isFinalStateValid() throws Exception; void writeVersion(RMStateVersion version) throws Exception; RMStateVersion getCurrentVersion() throws Exception; + boolean appExists(RMApp app) throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -128,7 +130,7 @@ public class RMStateStoreTestBase extend dispatcher.notified = false; } - void storeApp(RMStateStore store, ApplicationId appId, long submitTime, + RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -141,6 +143,7 @@ public class RMStateStoreTestBase extend when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); store.storeNewApplication(mockApp); + return mockApp; } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, @@ -370,6 +373,7 @@ public class RMStateStoreTestBase extend Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + store.close(); } private Token<AMRMTokenIdentifier> generateAMRMToken( @@ -415,4 +419,43 @@ public class RMStateStoreTestBase extend Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } + + public void testAppDeletion(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + // create and store apps + ArrayList<RMApp> appList = new ArrayList<RMApp>(); + int NUM_APPS = 5; + for (int i = 0; i < NUM_APPS; i++) { + ApplicationId appId = ApplicationId.newInstance(1383183338, i); + RMApp app = storeApp(store, appId, 123456789, 987654321); + appList.add(app); + } + + Assert.assertEquals(NUM_APPS, appList.size()); + for (RMApp app : appList) { + // wait for app to be stored. + while (true) { + if (stateStoreHelper.appExists(app)) { + break; + } else { + Thread.sleep(100); + } + } + } + + for (RMApp app : appList) { + // remove the app + store.removeApplication(app); + // wait for app to be removed. + while (true) { + if (!stateStoreHelper.appExists(app)) { + break; + } else { + Thread.sleep(100); + } + } + } + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Mon Dec 9 22:52:02 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -69,6 +71,13 @@ public class TestFSRMStateStore extends public RMStateVersion getCurrentVersion() { return CURRENT_VERSION_INFO; } + + public Path getAppDir(String appId) { + Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME); + Path appRootDir = new Path(rootDir, RM_APP_ROOT); + Path appDir = new Path(appRootDir, appId); + return appDir; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -109,9 +118,16 @@ public class TestFSRMStateStore extends public RMStateVersion getCurrentVersion() throws Exception { return store.getCurrentVersion(); } + + public boolean appExists(RMApp app) throws IOException { + FileSystem fs = cluster.getFileSystem(); + Path nodePath = + store.getAppDir(app.getApplicationId().toString()); + return fs.exists(nodePath); + } } - @Test + @Test(timeout = 60000) public void testFSRMStateStore() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); MiniDFSCluster cluster = @@ -126,11 +142,8 @@ public class TestFSRMStateStore extends String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; ApplicationAttemptId attemptId3 = ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); - Path rootDir = - new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); - Path appRootDir = new Path(rootDir, "RMAppRoot"); Path appDir = - new Path(appRootDir, attemptId3.getApplicationId().toString()); + fsTester.store.getAppDir(attemptId3.getApplicationId().toString()); Path tempAppAttemptFile = new Path(appDir, attemptId3.toString() + ".tmp"); fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); @@ -138,10 +151,11 @@ public class TestFSRMStateStore extends fsOut.close(); testRMAppStateStore(fsTester); - Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath + Assert.assertFalse(fsTester.workingDirPathURI .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); testCheckVersion(fsTester); + testAppDeletion(fsTester); } finally { cluster.shutdown(); } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Mon Dec 9 22:52:02 2013 @@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -57,6 +59,7 @@ public class TestZKRMStateStore extends ZooKeeper client; TestZKRMStateStoreInternal store; + String workingZnode; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -79,11 +82,16 @@ public class TestZKRMStateStore extends public RMStateVersion getCurrentVersion() { return CURRENT_VERSION_INFO; } + + public String getAppNode(String appId) { + return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" + + appId; + } } public RMStateStore getRMStateStore() throws Exception { - String workingZnode = "/Test"; - Configuration conf = new YarnConfiguration(); + YarnConfiguration conf = new YarnConfiguration(); + workingZnode = "/Test"; conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -107,14 +115,22 @@ public class TestZKRMStateStore extends public RMStateVersion getCurrentVersion() throws Exception { return store.getCurrentVersion(); } + + public boolean appExists(RMApp app) throws Exception { + Stat node = + client.exists(store.getAppNode(app.getApplicationId().toString()), + false); + return node !=null; + } } - @Test + @Test (timeout = 60000) public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); testCheckVersion(zkTester); + testAppDeletion(zkTester); } private Configuration createHARMConf( Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Mon Dec 9 22:52:02 2013 @@ -120,7 +120,7 @@ public class TestZKRMStateStoreZKClientC TestZKClient zkClientTester = new TestZKClient(); final String path = "/test"; YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000); conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100); final ZKRMStateStore store = (ZKRMStateStore) zkClientTester.getRMStateStore(conf); Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Mon Dec 9 22:52:02 2013 @@ -651,5 +651,35 @@ public class TestCapacityScheduler { } assertFalse(failed.get()); } + + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", "a2", resourceManager); + application_1.submit(); + + Application application_2 = new Application("user_0", "b2", resourceManager); + application_2.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInA.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId())); + assertEquals(3, appsInRoot.size()); + + Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue")); + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Dec 9 22:52:02 2013 @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -2490,4 +2491,40 @@ public class TestFairScheduler { assertEquals("Incorrect number of containers allocated", 1, app .getLiveContainers().size()); } + + @Test + public void testGetAppsInQueue() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + ApplicationAttemptId appAttId1 = + createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1"); + ApplicationAttemptId appAttId2 = + createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1"); + ApplicationAttemptId appAttId3 = + createSchedulingRequest(1024, 1, "default", "user1"); + + List<ApplicationAttemptId> apps = + scheduler.getAppsInQueue("queue1.subqueue1"); + assertEquals(1, apps.size()); + assertEquals(appAttId1, apps.get(0)); + // with and without root prefix should work + apps = scheduler.getAppsInQueue("root.queue1.subqueue1"); + assertEquals(1, apps.size()); + assertEquals(appAttId1, apps.get(0)); + + apps = scheduler.getAppsInQueue("user1"); + assertEquals(1, apps.size()); + assertEquals(appAttId3, apps.get(0)); + // with and without root prefix should work + apps = scheduler.getAppsInQueue("root.user1"); + assertEquals(1, apps.size()); + assertEquals(appAttId3, apps.get(0)); + + // apps in subqueues should be included + apps = scheduler.getAppsInQueue("queue1"); + Assert.assertEquals(2, apps.size()); + Set<ApplicationAttemptId> appAttIds = Sets.newHashSet(apps.get(0), apps.get(1)); + assertTrue(appAttIds.contains(appAttId1)); + assertTrue(appAttIds.contains(appAttId2)); + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1549699&r1=1549698&r2=1549699&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Mon Dec 9 22:52:02 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Method; @@ -555,6 +556,24 @@ public class TestFifoScheduler { Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host)); rm.stop(); } + + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", resourceManager); + application_1.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default"); + assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInDefault.size()); + + Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); + } private void checkApplicationResourceUsage(int expected, Application application) {
