alkis commented on code in PR #40121:
URL: https://github.com/apache/spark/pull/40121#discussion_r1115579946


##########
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala:
##########
@@ -17,100 +17,66 @@
 
 package org.apache.spark.util.collection
 
-import scala.collection.mutable.PriorityQueue
+import java.util.PriorityQueue
 
 /**
- * PercentileHeap is designed to be used to quickly track the percentile of a 
group of numbers
- * that may contain duplicates. Inserting a new number has O(log n) time 
complexity and
- * determining the percentile has O(1) time complexity.
- * The basic idea is to maintain two heaps: a smallerHalf and a largerHalf. 
The smallerHalf
- * stores the smaller half of all numbers while the largerHalf stores the 
larger half.
- * The sizes of two heaps need to match the percentage each time when a new 
number is inserted so
- * that the ratio of their sizes is percentage to (1 - percentage). Therefore 
each time when
- * percentile() is called we check if the sizes of two heaps match the 
percentage. If they do,
- * we should return the average of the two top values of heaps. Otherwise we 
return the top of the
- * heap which exceeds its percentage.
+ * PercentileHeap tracks the percentile of a collection of numbers.
+ *
+ * Insertion is O(log n), Lookup is O(1).
+ *
+ * The implementation keeps two heaps: a small heap (`smallHeap`) and a large 
heap (`largeHeap`).
+ * The small heap stores all the numbers below the percentile and the large 
heap stores the ones
+ * above the percentile. During insertion the relative sizes of the heaps are 
adjusted to match
+ * the target percentile.
  */
-private[spark] class PercentileHeap(percentage: Double = 0.5)(implicit val 
ord: Ordering[Double]) {
-  assert(percentage >= 0 && percentage <= 1)
+private[spark] class PercentileHeap(percentage: Double = 0.5) {
+  assert(percentage > 0 && percentage < 1)
 
-  /**
-   * Stores all the numbers less than the current percentile in a smallerHalf,
-   * i.e percentile is the maximum, at the root.
-   */
-  private[this] val smallerHalf = PriorityQueue.empty[Double](ord)
+  // This is a min-heap so it works out of the box.
+  private[this] val largeHeap = new PriorityQueue[Double]
+  // This is a max-heap. If we pass a comparator things get slower because of 
function call
+  // overhead (>2x slower on insert). Instead we negate values when we 
offer/poll/peek.

Review Comment:
   It (also) optimizes the case for `null` comparator which also shows up in 
the benchmarks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to