Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/6042#discussion_r53744297
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
---
@@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String
private[sql] object StatFunctions extends Logging {
+ import QuantileSummaries.Stats
+
+ /**
+ * Calculates the approximate quantile for the given column.
+ *
+ * If you need to compute multiple quantiles at once, you should use
[[multipleApproxQuantiles]]
+ *
+ * Note on the target error.
+ *
+ * The result of this algorithm has the following deterministic bound:
+ * if the DataFrame has N elements and if we request the quantile `phi`
up to error `epsi`,
+ * then the algorithm will return a sample `x` from the DataFrame so
that the *exact* rank
+ * of `x` close to (phi * N). More precisely:
+ *
+ * floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
+ *
+ * Note on the algorithm used.
+ *
+ * This method implements a variation of the Greenwald-Khanna algorithm
+ * (with some speed optimizations). The algorithm was first present in
the following article:
+ * "Space-efficient Online Computation of Quantile Summaries" by
Greenwald, Michael
+ * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
+ *
+ * The performance optimizations are detailed in the comments of the
implementation.
+ *
+ * @param df the dataframe to estimate quantiles on
+ * @param col the name of the column
+ * @param quantile the target quantile of interest
+ * @param epsilon the target error. Should be >= 0.
+ * */
+ def approxQuantile(
+ df: DataFrame,
+ col: String,
+ quantile: Double,
+ epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
+ require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the
range of (0.0, 1.0).")
+ val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col),
Seq(quantile), epsilon)
+ res
+ }
+
+ /**
+ * Runs multiple quantile computations in a single pass, with the same
target error.
+ *
+ * See [[approxQuantile)]] for more details on the approximation
guarantees.
+ *
+ * @param df the dataframe
+ * @param cols columns of the dataframe
+ * @param quantiles target quantiles to compute
+ * @param epsilon the precision to achieve
+ * @return for each column, returns the requested approximations
+ */
+ def multipleApproxQuantiles(
+ df: DataFrame,
+ cols: Seq[String],
+ quantiles: Seq[Double],
+ epsilon: Double): Seq[Seq[Double]] = {
+ val columns: Seq[Column] = cols.map { colName =>
+ val field = df.schema(colName)
+ require(field.dataType.isInstanceOf[NumericType],
+ s"Quantile calculation for column $colName with data type
${field.dataType}" +
+ " is not supported.")
+ Column(Cast(Column(colName).expr, DoubleType))
+ }
+ val emptySummaries = Array.fill(cols.size)(
+ new QuantileSummaries(QuantileSummaries.defaultCompressThreshold,
epsilon))
+
+ // Note that it works more or less by accident as `rdd.aggregate` is
not a pure function:
+ // this function returns the same array as given in the input (because
`aggregate` reuses
+ // the same argument).
+ def apply(summaries: Array[QuantileSummaries], row: Row):
Array[QuantileSummaries] = {
+ var i = 0
+ while (i < summaries.length) {
+ summaries(i) = summaries(i).insert(row.getDouble(i))
+ i += 1
+ }
+ summaries
+ }
+
+ def merge(
+ sum1: Array[QuantileSummaries],
+ sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
+ sum1.zip(sum2).map { case (s1, s2) =>
s1.compress().merge(s2.compress()) }
+ }
+ val summaries = df.select(columns:
_*).rdd.aggregate(emptySummaries)(apply, merge)
+
+ summaries.map { summary => quantiles.map(summary.query) }
+ }
+
+ /**
+ * Helper class to compute approximate quantile summary.
+ * This implementation is based on the algorithm proposed in the paper:
+ * "Space-efficient Online Computation of Quantile Summaries" by
Greenwald, Michael
+ * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
+ *
+ * In order to optimize for speed, it maintains an internal buffer of
the last seen samples,
+ * and only inserts them after crossing a certain size threshold. This
guarantees a near-constant
+ * runtime complexity compared to the original algorithm.
+ *
+ * @param compressThreshold the compression threshold: after the
internal buffer of statistics
+ * crosses this size, it attempts to compress
the statistics together
+ * @param epsilon the target precision
+ * @param sampled a buffer of quantile statistics. See the G-K article
for more details
+ * @param count the count of all the elements *inserted in the sampled
buffer*
+ * (excluding the head buffer)
+ * @param headSampled a buffer of latest samples seen so far
+ */
+ class QuantileSummaries(
+ val compressThreshold: Int,
+ val epsilon: Double,
+ val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
+ private[stat] var count: Long = 0L,
+ val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends
Serializable {
+
+ import QuantileSummaries._
+
+ def insert(x: Double): QuantileSummaries = {
+ headSampled.append(x)
+ if (headSampled.size >= defaultHeadSize) {
+ this.withHeadInserted
+ } else {
+ this
+ }
+ }
+
+ /**
+ * Inserts an array of (unsorted samples) in a batch, sorting the
array first to traverse
+ * the summary statistics in a single batch.
+ *
+ * This method does not modify the current object and returns if
necessary a new copy.
+ *
+ * @return a new quantile summary object.
+ */
+ private def withHeadInserted: QuantileSummaries = {
+ if (headSampled.isEmpty) {
+ return this
+ }
+ var currentCount = count
+ val sorted = headSampled.toArray.sorted
+ val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
+ // The index of the next element to insert
+ var sampleIdx = 0
+ // The index of the sample currently being inserted.
+ var opsIdx: Int = 0
+ while(opsIdx < sorted.length) {
+ val currentSample = sorted(opsIdx)
+ // Add all the samples before the next observation.
+ while(sampleIdx < sampled.size && sampled(sampleIdx).value <=
currentSample) {
+ newSamples.append(sampled(sampleIdx))
+ sampleIdx += 1
+ }
+
+ // If it is the first one to insert, of if it is the last one
+ currentCount += 1
+ val delta =
+ if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx
== sorted.length - 1)) {
+ 0
+ } else {
+ math.floor(2 * epsilon * currentCount).toInt
+ }
+
+ val tuple = Stats(currentSample, 1, delta)
+ newSamples.append(tuple)
+ opsIdx += 1
+ }
+
+ // Add all the remaining existing samples
+ while(sampleIdx < sampled.size) {
+ newSamples.append(sampled(sampleIdx))
+ sampleIdx += 1
+ }
+ new QuantileSummaries(compressThreshold, epsilon, newSamples,
currentCount)
+ }
+
+ def compress(): QuantileSummaries = {
+ // Inserts all the elements first
+ val inserted = this.withHeadInserted
+ assert(inserted.headSampled.isEmpty)
+ assert(inserted.count == count + headSampled.size)
+ val compressed =
+ compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon *
inserted.count)
+ new QuantileSummaries(compressThreshold, epsilon, compressed,
inserted.count)
+ }
+
+ def merge(other: QuantileSummaries): QuantileSummaries = {
+ if (other.count == 0) {
+ this
+ } else if (count == 0) {
+ other
+ } else {
+ // We rely on the fact that they are ordered to efficiently
interleave them.
+ val thisSampled = sampled.toList
+ val otherSampled = other.sampled.toList
--- End diff --
I wonder how much speedup we can get by merging the two lists manually
compared to `(thisSampled ++ otherSampled).sorted`. Did you run some tests?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]