YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. 
Contributed by Rohith Sharmaks.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c944600
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c944600
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c944600

Branch: refs/heads/HDFS-EC
Commit: 0c9446003fa9b462f75736d42c32925d931059c6
Parents: 10415a0
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Mon Jan 5 00:08:31 2015 +0900
Committer: Zhe Zhang <z...@apache.org>
Committed: Mon Jan 5 14:48:37 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../scheduler/capacity/LeafQueue.java           |  4 +-
 .../scheduler/capacity/TestLeafQueue.java       | 86 ++++++++++++++++++++
 3 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e6694f1..0d33b4a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -308,6 +308,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
     AsyncDispatcher. (Rohith Sharmaks via zjshen)
 
+    YARN-2922. ConcurrentModificationException in CapacityScheduler's 
LeafQueue.
+    (Rohith Sharmaks via ozawa)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f129ff4..47679a6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1878,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   // return a single Resource capturing the overal amount of pending resources
-  public Resource getTotalResourcePending() {
+  public synchronized Resource getTotalResourcePending() {
     Resource ret = BuilderUtils.newResource(0, 0);
     for (FiCaSchedulerApp f : activeApplications) {
       Resources.addTo(ret, f.getTotalPendingRequests());
@@ -1887,7 +1887,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public void collectSchedulerApplications(
+  public synchronized void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
     for (FiCaSchedulerApp pendingApp : pendingApplications) {
       apps.add(pendingApp.getApplicationAttemptId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 642363e..fb7bb2c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -37,11 +37,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -2353,6 +2356,89 @@ public class TestLeafQueue {
     }
   }
 
+  @Test
+  public void testConcurrentAccess() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM();
+    rm.init(conf);
+    rm.start();
+
+    final String queue = "default";
+    final String user = "user";
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue);
+
+    final List<FiCaSchedulerApp> listOfApps =
+        createListOfApps(10000, user, defaultQueue);
+
+    final CyclicBarrier cb = new CyclicBarrier(2);
+    final List<ConcurrentModificationException> conException =
+        new ArrayList<ConcurrentModificationException>();
+
+    Thread submitAndRemove = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+
+        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
+          defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user);
+        }
+        try {
+          cb.await();
+        } catch (Exception e) {
+          // Ignore
+        }
+        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
+          defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue);
+        }
+      }
+    }, "SubmitAndRemoveApplicationAttempt Thread");
+
+    Thread getAppsInQueue = new Thread(new Runnable() {
+      List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
+
+      @Override
+      public void run() {
+        try {
+          try {
+            cb.await();
+          } catch (Exception e) {
+            // Ignore
+          }
+          defaultQueue.collectSchedulerApplications(apps);
+        } catch (ConcurrentModificationException e) {
+          conException.add(e);
+        }
+      }
+
+    }, "GetAppsInQueue Thread");
+
+    submitAndRemove.start();
+    getAppsInQueue.start();
+
+    submitAndRemove.join();
+    getAppsInQueue.join();
+
+    assertTrue("ConcurrentModificationException is thrown",
+        conException.isEmpty());
+    rm.stop();
+
+  }
+
+  private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
+      LeafQueue defaultQueue) {
+    List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
+    for (int i = 0; i < noOfApps; i++) {
+      ApplicationAttemptId appAttemptId_0 =
+          TestUtils.getMockApplicationAttemptId(i, 0);
+      FiCaSchedulerApp app_0 =
+          new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue,
+              mock(ActiveUsersManager.class), spyRMContext);
+      appsLists.add(app_0);
+    }
+    return appsLists;
+  }
+
   private CapacitySchedulerContext mockCSContext(
       CapacitySchedulerConfiguration csConf, Resource clusterResource) {
     CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);

Reply via email to