Github user thunterdb commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6042#discussion_r53821735
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala 
---
    @@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String
     
     private[sql] object StatFunctions extends Logging {
     
    +  import QuantileSummaries.Stats
    +
    +  /**
    +   * Calculates the approximate quantile for the given column.
    +   *
    +   * If you need to compute multiple quantiles at once, you should use 
[[multipleApproxQuantiles]]
    +   *
    +   * Note on the target error.
    +   *
    +   * The result of this algorithm has the following deterministic bound:
    +   * if the DataFrame has N elements and if we request the quantile `phi` 
up to error `epsi`,
    +   * then the algorithm will return a sample `x` from the DataFrame so 
that the *exact* rank
    +   * of `x` close to (phi * N). More precisely:
    +   *
    +   *   floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
    +   *
    +   * Note on the algorithm used.
    +   *
    +   * This method implements a variation of the Greenwald-Khanna algorithm
    +   * (with some speed optimizations). The algorithm was first present in 
the following article:
    +   * "Space-efficient Online Computation of Quantile Summaries" by 
Greenwald, Michael
    +   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
    +   *
    +   * The performance optimizations are detailed in the comments of the 
implementation.
    +   *
    +   * @param df the dataframe to estimate quantiles on
    +   * @param col the name of the column
    +   * @param quantile the target quantile of interest
    +   * @param epsilon the target error. Should be >= 0.
    +   * */
    +  def approxQuantile(
    +      df: DataFrame,
    +      col: String,
    +      quantile: Double,
    +      epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
    +    require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the 
range of (0.0, 1.0).")
    +    val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), 
Seq(quantile), epsilon)
    +    res
    +  }
    +
    +  /**
    +   * Runs multiple quantile computations in a single pass, with the same 
target error.
    +   *
    +   * See [[approxQuantile)]] for more details on the approximation 
guarantees.
    +   *
    +   * @param df the dataframe
    +   * @param cols columns of the dataframe
    +   * @param quantiles target quantiles to compute
    +   * @param epsilon the precision to achieve
    +   * @return for each column, returns the requested approximations
    +   */
    +  def multipleApproxQuantiles(
    +      df: DataFrame,
    +      cols: Seq[String],
    +      quantiles: Seq[Double],
    +      epsilon: Double): Seq[Seq[Double]] = {
    +    val columns: Seq[Column] = cols.map { colName =>
    +      val field = df.schema(colName)
    +      require(field.dataType.isInstanceOf[NumericType],
    +        s"Quantile calculation for column $colName with data type 
${field.dataType}" +
    +        " is not supported.")
    +      Column(Cast(Column(colName).expr, DoubleType))
    +    }
    +    val emptySummaries = Array.fill(cols.size)(
    +      new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
epsilon))
    +
    +    // Note that it works more or less by accident as `rdd.aggregate` is 
not a pure function:
    +    // this function returns the same array as given in the input (because 
`aggregate` reuses
    +    // the same argument).
    +    def apply(summaries: Array[QuantileSummaries], row: Row): 
Array[QuantileSummaries] = {
    +      var i = 0
    +      while (i < summaries.length) {
    +        summaries(i) = summaries(i).insert(row.getDouble(i))
    +        i += 1
    +      }
    +      summaries
    +    }
    +
    +    def merge(
    +        sum1: Array[QuantileSummaries],
    +        sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
    +      sum1.zip(sum2).map { case (s1, s2) => 
s1.compress().merge(s2.compress()) }
    +    }
    +    val summaries = df.select(columns: 
_*).rdd.aggregate(emptySummaries)(apply, merge)
    +
    +    summaries.map { summary => quantiles.map(summary.query) }
    +  }
    +
    +  /**
    +   * Helper class to compute approximate quantile summary.
    +   * This implementation is based on the algorithm proposed in the paper:
    +   * "Space-efficient Online Computation of Quantile Summaries" by 
Greenwald, Michael
    +   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
    +   *
    +   * In order to optimize for speed, it maintains an internal buffer of 
the last seen samples,
    +   * and only inserts them after crossing a certain size threshold. This 
guarantees a near-constant
    +   * runtime complexity compared to the original algorithm.
    +   *
    +   * @param compressThreshold the compression threshold: after the 
internal buffer of statistics
    +   *                          crosses this size, it attempts to compress 
the statistics together
    +   * @param epsilon the target precision
    +   * @param sampled a buffer of quantile statistics. See the G-K article 
for more details
    +   * @param count the count of all the elements *inserted in the sampled 
buffer*
    +   *              (excluding the head buffer)
    +   * @param headSampled a buffer of latest samples seen so far
    +   */
    +  class QuantileSummaries(
    +      val compressThreshold: Int,
    +      val epsilon: Double,
    +      val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
    +      private[stat] var count: Long = 0L,
    +      val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends 
Serializable {
    +
    +    import QuantileSummaries._
    +
    +    def insert(x: Double): QuantileSummaries = {
    --- End diff --
    
    oh yes, thanks. This is an important one. I also added documentation to the 
other methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to