This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9ee5265  YARN-10178: Global Scheduler async thread crash caused by 
'Comparison method violates its general contract. Contributed by  Andras Gyori 
(gandras) and Qi Zhu (zhuqi).
9ee5265 is described below

commit 9ee5265fb33b66421dc94034f54307fae7cff096
Author: Eric Payne <epa...@apache.org>
AuthorDate: Tue Dec 21 19:48:06 2021 +0000

    YARN-10178: Global Scheduler async thread crash caused by 'Comparison 
method violates its general contract. Contributed by  Andras Gyori (gandras) 
and Qi Zhu (zhuqi).
---
 .../PriorityUtilizationQueueOrderingPolicy.java    | 65 +++++++++++++++++-----
 1 file changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
index ada665d..c475be3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -95,24 +96,26 @@ public class PriorityUtilizationQueueOrderingPolicy 
implements QueueOrderingPoli
   /**
    * Comparator that both looks at priority and utilization
    */
-  private class PriorityQueueComparator implements  Comparator<CSQueue> {
+  private class PriorityQueueComparator
+      implements Comparator<PriorityQueueResourcesForSorting> {
 
     @Override
-    public int compare(CSQueue q1, CSQueue q2) {
+    public int compare(PriorityQueueResourcesForSorting q1Sort,
+        PriorityQueueResourcesForSorting q2Sort) {
       String p = partitionToLookAt.get();
 
-      int rc = compareQueueAccessToPartition(q1, q2, p);
+      int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
       if (0 != rc) {
         return rc;
       }
 
-      float used1 = q1.getQueueCapacities().getUsedCapacity(p);
-      float used2 = q2.getQueueCapacities().getUsedCapacity(p);
+      float used1 = q1Sort.usedCapacity;
+      float used2 = q2Sort.usedCapacity;
       int p1 = 0;
       int p2 = 0;
       if (respectPriority) {
-        p1 = q1.getPriority().getPriority();
-        p2 = q2.getPriority().getPriority();
+        p1 = q1Sort.queue.getPriority().getPriority();
+        p2 = q2Sort.queue.getPriority().getPriority();
       }
 
       rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, 
p2);
@@ -120,8 +123,8 @@ public class PriorityUtilizationQueueOrderingPolicy 
implements QueueOrderingPoli
       // For queue with same used ratio / priority, queue with higher 
configured
       // capacity goes first
       if (0 == rc) {
-        float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
-        float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
+        float abs1 = q1Sort.absoluteCapacity;
+        float abs2 = q2Sort.absoluteCapacity;
         return Float.compare(abs2, abs1);
       }
 
@@ -156,6 +159,29 @@ public class PriorityUtilizationQueueOrderingPolicy 
implements QueueOrderingPoli
     }
   }
 
+  /**
+   * A simple storage class to represent a snapshot of a queue.
+   */
+  public static class PriorityQueueResourcesForSorting {
+    private final float usedCapacity;
+    private final float absoluteCapacity;
+    private final CSQueue queue;
+
+    PriorityQueueResourcesForSorting(CSQueue queue) {
+      this.queue = queue;
+      this.usedCapacity =
+          queue.getQueueCapacities().
+              getUsedCapacity(partitionToLookAt.get());
+      this.absoluteCapacity =
+          queue.getQueueCapacities().
+              getAbsoluteCapacity(partitionToLookAt.get());
+    }
+
+    public CSQueue getQueue() {
+      return queue;
+    }
+  }
+
   public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
     this.respectPriority = respectPriority;
   }
@@ -167,12 +193,23 @@ public class PriorityUtilizationQueueOrderingPolicy 
implements QueueOrderingPoli
 
   @Override
   public Iterator<CSQueue> getAssignmentIterator(String partition) {
-    // Since partitionToLookAt is a thread local variable, and every time we
-    // copy and sort queues, so it's safe for multi-threading environment.
+    // partitionToLookAt is a thread local variable, therefore it is safe to 
mutate it.
     PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
-    List<CSQueue> sortedQueue = new ArrayList<>(queues);
-    Collections.sort(sortedQueue, new PriorityQueueComparator());
-    return sortedQueue.iterator();
+
+    // Sort the snapshot of the queues in order to avoid breaking the 
prerequisites of TimSort.
+    // See YARN-10178 for details.
+    List<PriorityQueueResourcesForSorting> queueSnapshots = new ArrayList<>();
+    for (CSQueue queue : queues) {
+      queueSnapshots.add(new PriorityQueueResourcesForSorting(queue));
+    }
+    Collections.sort(queueSnapshots, new PriorityQueueComparator());
+
+    List<CSQueue> sortedQueues = new ArrayList<>();
+    for (PriorityQueueResourcesForSorting queueSnapshot : queueSnapshots) {
+      sortedQueues.add(queueSnapshot.queue);
+    }
+
+    return sortedQueues.iterator();
   }
 
   @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to