srowen commented on a change in pull request #32700:
URL: https://github.com/apache/spark/pull/32700#discussion_r641943017
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
##########
@@ -229,46 +229,99 @@ class QuantileSummaries(
}
/**
- * Runs a query for a given quantile.
+ * Finds the approximate quantile for a percentile, starting at a specific
index in the summary.
+ * This is a helper method that is called as we are making a pass over the
summary and a sorted
+ * sequence of input percentiles.
+ *
+ * @param index The point at which to start scanning the summary for an
approximate value.
+ * @param minRankAtIndex The accumulated minimum rank at the given index.
+ * @param targetError Target error from the summary.
+ * @param percentile The percentile whose value is computed.
+ * @return A tuple (i, r, a) where: i is the updated index for the next
call, r is the updated
+ * rank at i, and a is the approximate quantile.
+ */
+ private def findApproxQuantile(
+ index: Int,
+ minRankAtIndex: Long,
+ targetError: Double,
+ percentile: Double): (Int, Long, Double) = {
+ var curSample = sampled(index)
+ val rank = math.ceil(percentile * count).toLong
+ var i = index
+ var minRank = minRankAtIndex
+ while (i < sampled.length - 1) {
+ val maxRank = minRank + curSample.delta
+ if (maxRank - targetError <= rank && rank <= minRank + targetError) {
+ return (i, minRank, curSample.value)
+ } else {
+ i += 1
+ curSample = sampled(i)
+ minRank += curSample.g
+ }
+ }
+ (sampled.length - 1, 0, sampled.last.value)
+ }
+
+ /**
+ * Runs a query for a given sequence of percentiles.
* The result follows the approximation guarantees detailed above.
* The query can only be run on a compressed summary: you need to call
compress() before using
* it.
*
- * @param quantile the target quantile
- * @return
+ * @param percentiles the target percentiles
+ * @return the corresponding approximate quantiles, in the same order as the
input
*/
- def query(quantile: Double): Option[Double] = {
- require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range
[0.0, 1.0]")
- require(headSampled.isEmpty,
+ def query(percentiles: Seq[Double]): Option[Seq[Double]] = {
+ percentiles.foreach(p =>
+ require(p >= 0 && p <= 1.0, "percentile should be in the range [0.0,
1.0]"))
+ require(
+ headSampled.isEmpty,
"Cannot operate on an uncompressed summary, call compress() first")
if (sampled.isEmpty) return None
- if (quantile <= relativeError) {
- return Some(sampled.head.value)
- }
+ val targetError = sampled.foldLeft(Long.MinValue)((currentMax, stats) =>
+ currentMax.max(stats.delta + stats.g)) / 2
- if (quantile >= 1 - relativeError) {
- return Some(sampled.last.value)
- }
-
- // Target rank
- val rank = math.ceil(quantile * count).toLong
- val targetError = sampled.map(s => s.delta + s.g).max / 2
+ // Index to track the current sample
+ var index = 0
// Minimum rank at current sample
- var minRank = 0L
- var i = 0
- while (i < sampled.length - 1) {
- val curSample = sampled(i)
- minRank += curSample.g
- val maxRank = minRank + curSample.delta
- if (maxRank - targetError <= rank && rank <= minRank + targetError) {
- return Some(curSample.value)
- }
- i += 1
+ var minRank = sampled(0).g
+
+ val sortedPercentiles = percentiles.zipWithIndex.sortBy(_._1)
+ val result = Array.fill(percentiles.length)(0.0)
+ sortedPercentiles.foreach {
+ case (percentile, pos) =>
+ if (percentile <= relativeError) {
+ result(pos) = sampled.head.value
+ } else if (percentile >= 1 - relativeError) {
+ result(pos) = sampled.last.value
+ } else {
+ val (newIndex, newMinRank, approxQuantile) =
+ findApproxQuantile(index, minRank, targetError, percentile)
Review comment:
I don't need a benchmark or anything, but is this much faster if it
calls this method repeatedly? I think it saves some common computation, from
what I can see
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]