cloud-fan commented on code in PR #40121:
URL: https://github.com/apache/spark/pull/40121#discussion_r1115561191
##########
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala:
##########
@@ -17,100 +17,66 @@
package org.apache.spark.util.collection
-import scala.collection.mutable.PriorityQueue
+import java.util.PriorityQueue
/**
- * PercentileHeap is designed to be used to quickly track the percentile of a
group of numbers
- * that may contain duplicates. Inserting a new number has O(log n) time
complexity and
- * determining the percentile has O(1) time complexity.
- * The basic idea is to maintain two heaps: a smallerHalf and a largerHalf.
The smallerHalf
- * stores the smaller half of all numbers while the largerHalf stores the
larger half.
- * The sizes of two heaps need to match the percentage each time when a new
number is inserted so
- * that the ratio of their sizes is percentage to (1 - percentage). Therefore
each time when
- * percentile() is called we check if the sizes of two heaps match the
percentage. If they do,
- * we should return the average of the two top values of heaps. Otherwise we
return the top of the
- * heap which exceeds its percentage.
+ * PercentileHeap tracks the percentile of a collection of numbers.
+ *
+ * Insertion is O(log n), Lookup is O(1).
+ *
+ * The implementation keeps two heaps: a small heap (`smallHeap`) and a large
heap (`largeHeap`).
+ * The small heap stores all the numbers below the percentile and the large
heap stores the ones
+ * above the percentile. During insertion the relative sizes of the heaps are
adjusted to match
+ * the target percentile.
*/
-private[spark] class PercentileHeap(percentage: Double = 0.5)(implicit val
ord: Ordering[Double]) {
- assert(percentage >= 0 && percentage <= 1)
+private[spark] class PercentileHeap(percentage: Double = 0.5) {
+ assert(percentage > 0 && percentage < 1)
- /**
- * Stores all the numbers less than the current percentile in a smallerHalf,
- * i.e percentile is the maximum, at the root.
- */
- private[this] val smallerHalf = PriorityQueue.empty[Double](ord)
+ // This is a min-heap so it works out of the box.
+ private[this] val largeHeap = new PriorityQueue[Double]
+ // This is a max-heap. If we pass a comparator things get slower because of
function call
+ // overhead (>2x slower on insert). Instead we negate values when we
offer/poll/peek.
+ private[this] val smallHeap = new PriorityQueue[Double]
- /**
- * Stores all the numbers greater than the current percentile in a
largerHalf,
- * i.e percentile is the minimum, at the root.
- */
- private[this] val largerHalf = PriorityQueue.empty[Double](ord.reverse)
+ def isEmpty(): Boolean = smallHeap.isEmpty && largeHeap.isEmpty
- def isEmpty(): Boolean = {
- smallerHalf.isEmpty && largerHalf.isEmpty
- }
+ def size(): Int = smallHeap.size + largeHeap.size
- def size(): Int = {
- smallerHalf.size + largerHalf.size
+ /**
+ * Returns percentile of the inserted elements as if the inserted elements
were sorted and we
+ * returned `sorted(p)` where `p = (sorted.length * percentage).toInt` if
number of elements
+ * is odd, otherwise `(sorted(p-1) + sorted(p)) / 2` if number of elements
is even.
+ */
+ def percentile(): Double = {
+ if (isEmpty) throw new NoSuchElementException("empty")
+ if (size % 2 == 1 || smallHeap.isEmpty) {
+ largeHeap.peek
+ } else {
+ (largeHeap.peek + -smallHeap.peek) / 2d
Review Comment:
```suggestion
(largeHeap.peek + -smallHeap.peek) / 2.0
```
I think 2.0 is more commonly used as double literal than `2d` in the java
world.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]