This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ccaba25  YARN-10178: Global Scheduler async thread crash caused by 
'Comparison method violates its general contract. Contributed by  Andras Gyori 
(gandras) and Qi Zhu (zhuqi).
ccaba25 is described below

commit ccaba2561a7b8aa06f33342d504e38a095932b00
Author: Eric Payne <epa...@apache.org>
AuthorDate: Tue Dec 21 19:05:39 2021 +0000

    YARN-10178: Global Scheduler async thread crash caused by 'Comparison 
method violates its general contract. Contributed by  Andras Gyori (gandras) 
and Qi Zhu (zhuqi).
    
    (cherry picked from commit e2d6fd075dff4e6ea290ec638f0a3f6688e76335)
---
 .../PriorityUtilizationQueueOrderingPolicy.java    | 91 +++++++++++++++-------
 1 file changed, 65 insertions(+), 26 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
index d3e2f89..995c2ea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -28,12 +28,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
     .CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * For two queues with the same priority:
@@ -101,19 +100,21 @@ public class PriorityUtilizationQueueOrderingPolicy
   /**
    * Comparator that both looks at priority and utilization
    */
-  private class PriorityQueueComparator implements Comparator<CSQueue> {
+  private class PriorityQueueComparator
+      implements Comparator<PriorityQueueResourcesForSorting> {
 
     @Override
-    public int compare(CSQueue q1, CSQueue q2) {
+    public int compare(PriorityQueueResourcesForSorting q1Sort,
+        PriorityQueueResourcesForSorting q2Sort) {
       String p = partitionToLookAt.get();
 
-      int rc = compareQueueAccessToPartition(q1, q2, p);
+      int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
       if (0 != rc) {
         return rc;
       }
 
-      float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p);
-      float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p);
+      float q1AbsCapacity = q1Sort.absoluteCapacity;
+      float q2AbsCapacity = q2Sort.absoluteCapacity;
 
       //If q1's abs capacity > 0 and q2 is 0, then prioritize q1
       if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity,
@@ -127,28 +128,33 @@ public class PriorityUtilizationQueueOrderingPolicy
           q2AbsCapacity, 0f) == 0) {
         // both q1 has 0 and q2 has 0 capacity, then fall back to using
         // priority, abs used capacity to prioritize
-        float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p);
-        float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p);
+        float used1 = q1Sort.absoluteUsedCapacity;
+        float used2 = q2Sort.absoluteUsedCapacity;
 
-        return compare(q1, q2, used1, used2, p);
+        return compare(q1Sort, q2Sort, used1, used2,
+            q1Sort.queue.getPriority().
+                getPriority(), q2Sort.queue.getPriority().getPriority());
       } else{
         // both q1 has positive abs capacity and q2 has positive abs
         // capacity
-        float used1 = q1.getQueueCapacities().getUsedCapacity(p);
-        float used2 = q2.getQueueCapacities().getUsedCapacity(p);
+        float used1 = q1Sort.usedCapacity;
+        float used2 = q2Sort.usedCapacity;
 
-        return compare(q1, q2, used1, used2, p);
+        return compare(q1Sort, q2Sort, used1, used2,
+            q1Sort.queue.getPriority().getPriority(),
+            q2Sort.queue.getPriority().getPriority());
       }
     }
 
-    private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used,
-        String partition) {
+    private int compare(PriorityQueueResourcesForSorting q1Sort,
+        PriorityQueueResourcesForSorting q2Sort, float q1Used,
+                        float q2Used, int q1Prior, int q2Prior) {
 
       int p1 = 0;
       int p2 = 0;
       if (respectPriority) {
-        p1 = q1.getPriority().getPriority();
-        p2 = q2.getPriority().getPriority();
+        p1 = q1Prior;
+        p2 = q2Prior;
       }
 
       int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
@@ -158,16 +164,16 @@ public class PriorityUtilizationQueueOrderingPolicy
       // capacity goes first
       if (0 == rc) {
         Resource minEffRes1 =
-            q1.getQueueResourceQuotas().getConfiguredMinResource(partition);
+            q1Sort.configuredMinResource;
         Resource minEffRes2 =
-            q2.getQueueResourceQuotas().getConfiguredMinResource(partition);
+            q2Sort.configuredMinResource;
         if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
             Resources.none())) {
           return minEffRes2.compareTo(minEffRes1);
         }
 
-        float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition);
-        float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition);
+        float abs1 = q1Sort.absoluteCapacity;
+        float abs2 = q2Sort.absoluteCapacity;
         return Float.compare(abs2, abs1);
       }
 
@@ -203,6 +209,37 @@ public class PriorityUtilizationQueueOrderingPolicy
     }
   }
 
+  /**
+   * A simple storage class to represent a snapshot of a queue.
+   */
+  public static class PriorityQueueResourcesForSorting {
+    private final float absoluteUsedCapacity;
+    private final float usedCapacity;
+    private final Resource configuredMinResource;
+    private final float absoluteCapacity;
+    private final CSQueue queue;
+
+    PriorityQueueResourcesForSorting(CSQueue queue) {
+      this.queue = queue;
+      this.absoluteUsedCapacity =
+          queue.getQueueCapacities().
+              getAbsoluteUsedCapacity(partitionToLookAt.get());
+      this.usedCapacity =
+          queue.getQueueCapacities().
+              getUsedCapacity(partitionToLookAt.get());
+      this.absoluteCapacity =
+          queue.getQueueCapacities().
+              getAbsoluteCapacity(partitionToLookAt.get());
+      this.configuredMinResource =
+          queue.getQueueResourceQuotas().
+              getConfiguredMinResource(partitionToLookAt.get());
+    }
+
+    public CSQueue getQueue() {
+      return queue;
+    }
+  }
+
   public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
     this.respectPriority = respectPriority;
   }
@@ -214,12 +251,14 @@ public class PriorityUtilizationQueueOrderingPolicy
 
   @Override
   public Iterator<CSQueue> getAssignmentIterator(String partition) {
-    // Since partitionToLookAt is a thread local variable, and every time we
-    // copy and sort queues, so it's safe for multi-threading environment.
+    // partitionToLookAt is a thread local variable, therefore it is safe to 
mutate it.
     PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
-    List<CSQueue> sortedQueue = new ArrayList<>(queues);
-    Collections.sort(sortedQueue, new PriorityQueueComparator());
-    return sortedQueue.iterator();
+
+    // Sort the snapshot of the queues in order to avoid breaking the 
prerequisites of TimSort.
+    // See YARN-10178 for details.
+    return queues.stream().map(PriorityQueueResourcesForSorting::new).sorted(
+        new 
PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect(
+            Collectors.toList()).iterator();
   }
 
   @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to