Repository: hadoop
Updated Branches:
  refs/heads/yarn-5783 [created] 846dfa5f8


YARN-5783. Starvation tests. Patch v7


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/846dfa5f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/846dfa5f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/846dfa5f

Branch: refs/heads/yarn-5783
Commit: 846dfa5f80398de9cbc64bd4c17bd7b404d3167f
Parents: b425ca2
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Mon Nov 7 19:41:15 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Mon Nov 7 19:41:15 2016 -0800

----------------------------------------------------------------------
 .../impl/pb/ApplicationAttemptIdPBImpl.java     |  34 +++
 .../scheduler/SchedulerApplicationAttempt.java  |  16 ++
 .../scheduler/common/fica/FiCaSchedulerApp.java |  16 ++
 .../scheduler/fair/FSAppAttempt.java            |  16 ++
 .../scheduler/fair/FSPreemptionThread.java      |   2 +-
 .../scheduler/fair/FSStarvedApps.java           |  56 +++--
 .../scheduler/fair/FairScheduler.java           |   7 +-
 .../fair/FairSchedulerWithMockPreemption.java   |  58 +++++
 .../scheduler/fair/TestFSAppStarvation.java     | 245 +++++++++++++++++++
 9 files changed, 425 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
index 521d9cc..9cb65ae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptIdPBImpl.java
@@ -86,4 +86,38 @@ public class ApplicationAttemptIdPBImpl extends 
ApplicationAttemptId {
     proto = builder.build();
     builder = null;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    Preconditions.checkArgument(proto != null || builder != null,
+        "One of proto and builder should be not null.");
+
+    if (proto != null) {
+      return proto.equals(o);
+    }
+
+    if (builder != null) {
+      return builder.equals(o);
+    }
+
+    // Preconditions ensures we don't get here
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    Preconditions.checkArgument(proto != null || builder != null,
+        "One of proto and builder should be not null.");
+
+    if (proto != null) {
+      return proto.hashCode();
+    }
+
+    if (builder != null) {
+      return builder.hashCode();
+    }
+
+    // Preconditions ensures we don't get here
+    return 0;
+  }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index d148132..35c92ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1206,6 +1206,22 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     this.isAttemptRecovering = isRecovering;
   }
 
+  @Override
+  public int hashCode() {
+    return getApplicationAttemptId().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (! (o instanceof SchedulerApplicationAttempt)) {
+      return false;
+    }
+
+    SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o;
+    return (this == other ||
+        
this.getApplicationAttemptId().equals(other.getApplicationAttemptId()));
+  }
+
   /**
    * Different state for Application Master, user can see this state from web 
UI
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index aa7ad50..4d6d9ba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -672,4 +672,20 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
   public ReentrantReadWriteLock.WriteLock getWriteLock() {
     return this.writeLock;
   }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 60f701b..fe61ddd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -1086,4 +1086,20 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     }
     return assignContainer(node, false);
   }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 3732086..905b6f2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -38,7 +38,7 @@ import java.util.TimerTask;
  */
 public class FSPreemptionThread extends Thread {
   private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
-  private final FSContext context;
+  protected final FSContext context;
   private final FairScheduler scheduler;
   private final long warnTimeBeforeKill;
   private final Timer preemptionTimer;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
index 670a12d..4f28e41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.Serializable;
@@ -25,28 +24,49 @@ import java.util.Comparator;
 import java.util.concurrent.PriorityBlockingQueue;
 
 /**
- * Helper class to track starved apps.
+ * Helper class to track starved applications.
  *
  * Initially, this uses a blocking queue. We could use other data structures
  * in the future. This class also has some methods to simplify testing.
  */
-public class FSStarvedApps {
-  private int numAppsAddedSoFar;
-  private PriorityBlockingQueue<FSAppAttempt> apps;
+class FSStarvedApps {
 
-  public FSStarvedApps() {
-    apps = new PriorityBlockingQueue<>(10, new StarvationComparator());
+  // List of apps to be processed by the preemption thread.
+  private PriorityBlockingQueue<FSAppAttempt> appsToProcess;
+
+  // App being currently processed. This assumes a single reader.
+  private FSAppAttempt appBeingProcessed;
+
+  FSStarvedApps() {
+    appsToProcess = new PriorityBlockingQueue<>(10, new 
StarvationComparator());
   }
 
-  public void addStarvedApp(FSAppAttempt app) {
-    if (!apps.contains(app)) {
-      apps.add(app);
-      numAppsAddedSoFar++;
+  /**
+   * Add a starved application if it is not already added.
+   * @param app application to add
+   */
+  void addStarvedApp(FSAppAttempt app) {
+    if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) {
+      appsToProcess.add(app);
     }
   }
 
-  public FSAppAttempt take() throws InterruptedException {
-    return apps.take();
+  /**
+   * Blocking call to fetch the next app to process. The returned app is
+   * tracked until the next call to this method. This tracking assumes a
+   * single reader.
+   *
+   * @return starved application to process
+   * @throws InterruptedException if interrupted while waiting
+   */
+  FSAppAttempt take() throws InterruptedException {
+    // Reset appBeingProcessed before the blocking call
+    appBeingProcessed = null;
+
+    // Blocking call to fetch the next starved application
+    FSAppAttempt app = appsToProcess.take();
+    appBeingProcessed = app;
+    return app;
   }
 
   private static class StarvationComparator implements
@@ -62,14 +82,4 @@ public class FSStarvedApps {
       return ret;
     }
   }
-
-  @VisibleForTesting
-  public int getNumAppsAddedSoFar() {
-    return numAppsAddedSoFar;
-  }
-
-  @VisibleForTesting
-  public int numStarvedApps() {
-    return apps.size();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 6916e41..29ee6c9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1244,7 +1244,7 @@ public class FairScheduler extends
       }
 
       if (this.conf.getPreemptionEnabled()) {
-        preemptionThread = new FSPreemptionThread(this);
+        createPreemptionThread();
       }
     } finally {
       writeLock.unlock();
@@ -1262,6 +1262,11 @@ public class FairScheduler extends
     }
   }
 
+  @VisibleForTesting
+  protected void createPreemptionThread() {
+    preemptionThread = new FSPreemptionThread(this);
+  }
+
   private void updateReservationThreshold() {
     Resource newThreshold = Resources.multiply(
         getIncrementResourceCapability(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
new file mode 100644
index 0000000..25780cd
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FairSchedulerWithMockPreemption extends FairScheduler {
+  @Override
+  protected void createPreemptionThread() {
+    preemptionThread = new MockPreemptionThread(this);
+  }
+
+  static class MockPreemptionThread extends FSPreemptionThread {
+    private Set<FSAppAttempt> appsAdded = new HashSet<>();
+    private int totalAppsAdded = 0;
+
+    MockPreemptionThread(FairScheduler scheduler) {
+      super(scheduler);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          FSAppAttempt app = context.getStarvedApps().take();
+          appsAdded.add(app);
+          totalAppsAdded++;
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+    int uniqueAppsAdded() {
+      return appsAdded.size();
+    }
+
+    int totalAppsAdded() {
+      return totalAppsAdded;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/846dfa5f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
new file mode 100644
index 0000000..0e5511b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test class to verify identification of app starvation
+ */
+public class TestFSAppStarvation extends FairSchedulerTestBase {
+
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+  private final List<RMNode> rmNodes = new ArrayList<>();
+
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
+
+  private FairSchedulerWithMockPreemption.MockPreemptionThread 
preemptionThread;
+
+  @Before
+  public void setup() {
+    createConfiguration();
+    conf.set(YarnConfiguration.RM_SCHEDULER,
+        FairSchedulerWithMockPreemption.class.getCanonicalName());
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  /*
+   * Test to verify application starvation is computed only when preemption
+   * is enabled.
+   */
+  @Test
+  public void testPreemptionDisabled() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+
+    setupClusterAndSubmitJobs();
+
+    assertNull("Found starved apps even when preemption is turned off",
+        scheduler.getContext().getStarvedApps());
+  }
+
+  /*
+   * Test to verify application starvation is computed correctly when
+   * preemption is turned on.
+   */
+  @Test
+  public void testPreemptionEnabled() throws Exception {
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Expecting 2 starved applications, one each for the " +
+            "minshare and fairshare queues", 2,
+        preemptionThread.uniqueAppsAdded());
+
+    // Verify the apps get added again on a subsequent update
+    scheduler.update();
+    Thread.yield();
+    assertTrue("Each app is marked as starved exactly once",
+        preemptionThread.totalAppsAdded() > 
preemptionThread.uniqueAppsAdded());
+  }
+
+  /*
+   * Test to verify app starvation is computed only when the cluster
+   * utilization threshold is over the preemption threshold.
+   */
+  @Test
+  public void testClusterUtilizationThreshold() throws Exception {
+    // Set preemption threshold to 1.1, so the utilization is always lower
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
+
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Found starved apps when preemption threshold is over 100%", 
0,
+        preemptionThread.totalAppsAdded());
+  }
+
+  private void setupClusterAndSubmitJobs() throws Exception {
+    setupStarvedCluster();
+    submitAppsToEachLeafQueue();
+    sendNodeUpdateEvents();
+
+    // Sleep to hit the preemption timeouts
+    Thread.sleep(10);
+
+    // Scheduler update to populate starved apps
+    scheduler.update();
+
+    // Wait for apps to be processed by MockPreemptionThread
+    Thread.yield();
+  }
+
+  /**
+   * Setup the cluster for starvation testing:
+   * 1. Create FS allocation file
+   * 2. Create and start MockRM
+   * 3. Add two nodes to the cluster
+   * 4. Submit an app that uses up all resources on the cluster
+   */
+  private void setupStarvedCluster() throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    // Default queue
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+
+    // Queue with preemption disabled
+    out.println("<queue name=\"no-preemption\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+
+    // Queue with minshare preemption enabled
+    out.println("<queue name=\"minshare\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<minSharePreemptionTimeout>0" +
+        "</minSharePreemptionTimeout>");
+    out.println("<minResources>2048mb,2vcores</minResources>");
+    out.println("</queue>");
+
+    // Queue with fairshare preemption enabled
+    out.println("<queue name=\"fairshare\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+
+    // Child queue under fairshare with same settings
+    out.println("<queue name=\"child\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+
+    out.println("</queue>");
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
+        scheduler.preemptionThread;
+
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId app
+        = createSchedulingRequest(1024, 1, "root.default", "default", 8);
+
+    scheduler.update();
+    sendNodeUpdateEvents();
+
+    assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
+  }
+
+  private void submitAppsToEachLeafQueue() {
+    String queues[] = {"no-preemption", "minshare", "fairshare.child"};
+    for (String queue : queues) {
+      createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
+    }
+    scheduler.update();
+  }
+
+  private void addNode(int memory, int cores) {
+    int id = rmNodes.size() + 1;
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+            "127.0.0." + id);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+    rmNodes.add(node);
+  }
+
+  private void sendNodeUpdateEvents() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to