YARN-5802. updateApplicationPriority api in scheduler should ensure to 
re-insert app to correct ordering policy. Contributred by Bibin A Chundatt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19b3779a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19b3779a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19b3779a

Branch: refs/heads/HADOOP-13345
Commit: 19b3779ae7455230ed89971141b2667eae624aab
Parents: 69dd5fa
Author: Sunil <sun...@apache.org>
Authored: Fri Nov 4 16:07:28 2016 +0530
Committer: Sunil <sun...@apache.org>
Committed: Fri Nov 4 16:07:28 2016 +0530

----------------------------------------------------------------------
 .../scheduler/capacity/LeafQueue.java           | 12 +++-
 .../resourcemanager/TestClientRMService.java    | 19 +++++
 .../capacity/TestApplicationPriority.java       | 74 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/19b3779a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 214c6e7..8941fdf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -2266,12 +2266,18 @@ public class LeafQueue extends AbstractCSQueue {
     try {
       writeLock.lock();
       FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
-      getOrderingPolicy().removeSchedulableEntity(attempt);
-
+      boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
+      if (!isActive) {
+        pendingOrderingPolicy.removeSchedulableEntity(attempt);
+      }
       // Update new priority in SchedulerApplication
       attempt.setPriority(newAppPriority);
 
-      getOrderingPolicy().addSchedulableEntity(attempt);
+      if (isActive) {
+        orderingPolicy.addSchedulableEntity(attempt);
+      } else {
+        pendingOrderingPolicy.addSchedulableEntity(attempt);
+      }
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19b3779a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 706c274..884e236 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1642,6 +1642,25 @@ public class TestClientRMService {
   }
 
   @Test(timeout = 120000)
+  public void testUpdatePriorityAndKillAppWithZeroClusterResource()
+      throws Exception {
+    int maxPriority = 10;
+    int appPriority = 5;
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
+        maxPriority);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+    RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    ClientRMService rmService = rm.getClientRMService();
+    testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
+    rm.killApp(app1.getApplicationId());
+    rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    rm.stop();
+  }
+
+  @Test(timeout = 120000)
   public void testUpdateApplicationPriorityRequest() throws Exception {
     int maxPriority = 10;
     int appPriority = 5;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19b3779a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 523d49d..2a346f8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -54,6 +56,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -733,4 +736,75 @@ public class TestApplicationPriority {
     rm2.stop();
     rm1.stop();
   }
+
+  @Test(timeout = 120000)
+  public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception {
+    int maxPriority = 10;
+    int appPriority = 5;
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
+        maxPriority);
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue defaultQueue = (LeafQueue) cs.getQueue("default");
+
+    // Update priority and kill application with no resource
+    RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Collection<FiCaSchedulerApp> appsPending =
+        ((LeafQueue) defaultQueue).getPendingApplications();
+    Collection<FiCaSchedulerApp> activeApps =
+        ((LeafQueue) 
defaultQueue).getOrderingPolicy().getSchedulableEntities();
+
+    // Verify app is in pending state
+    Assert.assertEquals("Pending apps should be 1", 1, appsPending.size());
+    Assert.assertEquals("Active apps should be 0", 0, activeApps.size());
+
+    // kill app1 which is pending
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1);
+
+    // Check ordering policy size when resource is added
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Assert.assertEquals("Pending apps should be 0", 0, appsPending.size());
+    Assert.assertEquals("Active apps should be 1", 1, activeApps.size());
+    RMApp app3 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    RMApp app4 = rm.submitApp(1024, Priority.newInstance(appPriority));
+    Assert.assertEquals("Pending apps should be 2", 2, appsPending.size());
+    Assert.assertEquals("Active apps should be 1", 1, activeApps.size());
+    // kill app3, pending apps should reduce to 1
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 1, 1, app3);
+    // kill app2, running apps is killed and pending added to running
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 1, app2);
+    // kill app4, all apps are killed and both policy size should be zero
+    killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app4);
+    rm.stop();
+  }
+
+  private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue,
+      int appsPendingExpected, int activeAppsExpected, RMApp app)
+      throws YarnException {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    cs.updateApplicationPriority(Priority.newInstance(2),
+        app.getApplicationId());
+    SchedulerEvent removeAttempt;
+    removeAttempt = new AppAttemptRemovedSchedulerEvent(
+        app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED,
+        false);
+    cs.handle(removeAttempt);
+    rm.drainEvents();
+    Collection<FiCaSchedulerApp> appsPending =
+        ((LeafQueue) defaultQueue).getPendingApplications();
+    Collection<FiCaSchedulerApp> activeApps =
+        ((LeafQueue) defaultQueue).getApplications();
+    Assert.assertEquals("Pending apps should be " + appsPendingExpected,
+        appsPendingExpected, appsPending.size());
+    Assert.assertEquals("Active apps should be " + activeAppsExpected,
+        activeAppsExpected, activeApps.size());
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to