Github user thunterdb commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6042#discussion_r53821857
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala 
---
    @@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String
     
     private[sql] object StatFunctions extends Logging {
     
    +  import QuantileSummaries.Stats
    +
    +  /**
    +   * Calculates the approximate quantile for the given column.
    +   *
    +   * If you need to compute multiple quantiles at once, you should use 
[[multipleApproxQuantiles]]
    +   *
    +   * Note on the target error.
    +   *
    +   * The result of this algorithm has the following deterministic bound:
    +   * if the DataFrame has N elements and if we request the quantile `phi` 
up to error `epsi`,
    +   * then the algorithm will return a sample `x` from the DataFrame so 
that the *exact* rank
    +   * of `x` close to (phi * N). More precisely:
    +   *
    +   *   floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
    +   *
    +   * Note on the algorithm used.
    +   *
    +   * This method implements a variation of the Greenwald-Khanna algorithm
    +   * (with some speed optimizations). The algorithm was first present in 
the following article:
    +   * "Space-efficient Online Computation of Quantile Summaries" by 
Greenwald, Michael
    +   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
    +   *
    +   * The performance optimizations are detailed in the comments of the 
implementation.
    +   *
    +   * @param df the dataframe to estimate quantiles on
    +   * @param col the name of the column
    +   * @param quantile the target quantile of interest
    +   * @param epsilon the target error. Should be >= 0.
    +   * */
    +  def approxQuantile(
    +      df: DataFrame,
    +      col: String,
    +      quantile: Double,
    +      epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
    +    require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the 
range of (0.0, 1.0).")
    +    val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), 
Seq(quantile), epsilon)
    +    res
    +  }
    +
    +  /**
    +   * Runs multiple quantile computations in a single pass, with the same 
target error.
    +   *
    +   * See [[approxQuantile)]] for more details on the approximation 
guarantees.
    +   *
    +   * @param df the dataframe
    +   * @param cols columns of the dataframe
    +   * @param quantiles target quantiles to compute
    +   * @param epsilon the precision to achieve
    +   * @return for each column, returns the requested approximations
    +   */
    +  def multipleApproxQuantiles(
    +      df: DataFrame,
    +      cols: Seq[String],
    +      quantiles: Seq[Double],
    +      epsilon: Double): Seq[Seq[Double]] = {
    +    val columns: Seq[Column] = cols.map { colName =>
    +      val field = df.schema(colName)
    +      require(field.dataType.isInstanceOf[NumericType],
    +        s"Quantile calculation for column $colName with data type 
${field.dataType}" +
    +        " is not supported.")
    +      Column(Cast(Column(colName).expr, DoubleType))
    +    }
    +    val emptySummaries = Array.fill(cols.size)(
    +      new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 
epsilon))
    +
    +    // Note that it works more or less by accident as `rdd.aggregate` is 
not a pure function:
    +    // this function returns the same array as given in the input (because 
`aggregate` reuses
    +    // the same argument).
    +    def apply(summaries: Array[QuantileSummaries], row: Row): 
Array[QuantileSummaries] = {
    +      var i = 0
    +      while (i < summaries.length) {
    +        summaries(i) = summaries(i).insert(row.getDouble(i))
    +        i += 1
    +      }
    +      summaries
    +    }
    +
    +    def merge(
    +        sum1: Array[QuantileSummaries],
    +        sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
    +      sum1.zip(sum2).map { case (s1, s2) => 
s1.compress().merge(s2.compress()) }
    +    }
    +    val summaries = df.select(columns: 
_*).rdd.aggregate(emptySummaries)(apply, merge)
    +
    +    summaries.map { summary => quantiles.map(summary.query) }
    +  }
    +
    +  /**
    +   * Helper class to compute approximate quantile summary.
    +   * This implementation is based on the algorithm proposed in the paper:
    +   * "Space-efficient Online Computation of Quantile Summaries" by 
Greenwald, Michael
    +   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
    +   *
    +   * In order to optimize for speed, it maintains an internal buffer of 
the last seen samples,
    +   * and only inserts them after crossing a certain size threshold. This 
guarantees a near-constant
    +   * runtime complexity compared to the original algorithm.
    +   *
    +   * @param compressThreshold the compression threshold: after the 
internal buffer of statistics
    +   *                          crosses this size, it attempts to compress 
the statistics together
    +   * @param epsilon the target precision
    +   * @param sampled a buffer of quantile statistics. See the G-K article 
for more details
    +   * @param count the count of all the elements *inserted in the sampled 
buffer*
    +   *              (excluding the head buffer)
    +   * @param headSampled a buffer of latest samples seen so far
    +   */
    +  class QuantileSummaries(
    +      val compressThreshold: Int,
    +      val epsilon: Double,
    +      val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
    +      private[stat] var count: Long = 0L,
    +      val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends 
Serializable {
    +
    +    import QuantileSummaries._
    +
    +    def insert(x: Double): QuantileSummaries = {
    +      headSampled.append(x)
    +      if (headSampled.size >= defaultHeadSize) {
    +        this.withHeadInserted
    +      } else {
    +        this
    +      }
    +    }
    +
    +    /**
    +     * Inserts an array of (unsorted samples) in a batch, sorting the 
array first to traverse
    +     * the summary statistics in a single batch.
    +     *
    +     * This method does not modify the current object and returns if 
necessary a new copy.
    +     *
    +     * @return a new quantile summary object.
    +     */
    +    private def withHeadInserted: QuantileSummaries = {
    +      if (headSampled.isEmpty) {
    +        return this
    +      }
    +      var currentCount = count
    +      val sorted = headSampled.toArray.sorted
    +      val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
    +      // The index of the next element to insert
    +      var sampleIdx = 0
    +      // The index of the sample currently being inserted.
    +      var opsIdx: Int = 0
    +      while(opsIdx < sorted.length) {
    +        val currentSample = sorted(opsIdx)
    +        // Add all the samples before the next observation.
    +        while(sampleIdx < sampled.size && sampled(sampleIdx).value <= 
currentSample) {
    +          newSamples.append(sampled(sampleIdx))
    +          sampleIdx += 1
    +        }
    +
    +        // If it is the first one to insert, of if it is the last one
    +        currentCount += 1
    +        val delta =
    +          if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx 
== sorted.length - 1)) {
    +            0
    +          } else {
    +            math.floor(2 * epsilon * currentCount).toInt
    +          }
    +
    +        val tuple = Stats(currentSample, 1, delta)
    +        newSamples.append(tuple)
    +        opsIdx += 1
    +      }
    +
    +      // Add all the remaining existing samples
    +      while(sampleIdx < sampled.size) {
    +        newSamples.append(sampled(sampleIdx))
    +        sampleIdx += 1
    +      }
    +      new QuantileSummaries(compressThreshold, epsilon, newSamples, 
currentCount)
    +    }
    +
    +    def compress(): QuantileSummaries = {
    +      // Inserts all the elements first
    +      val inserted = this.withHeadInserted
    +      assert(inserted.headSampled.isEmpty)
    +      assert(inserted.count == count + headSampled.size)
    +      val compressed =
    +        compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * 
inserted.count)
    +      new QuantileSummaries(compressThreshold, epsilon, compressed, 
inserted.count)
    +    }
    +
    +    def merge(other: QuantileSummaries): QuantileSummaries = {
    +      if (other.count == 0) {
    +        this
    +      } else if (count == 0) {
    +        other
    +      } else {
    +        // We rely on the fact that they are ordered to efficiently 
interleave them.
    +        val thisSampled = sampled.toList
    +        val otherSampled = other.sampled.toList
    --- End diff --
    
    I agree that the current implementation is too complicated, and that 
probably just merging/sorting the two arrays directly is more efficient for the 
size considered.
    
    When running some performance testing, the cost of the algorithm was 
dominated by the cost of accessing the content of Rows. Only 4% of the running 
time was spent on insertion+merging, so this cost was negligible at this point.
    
    I am going to do as you suggest. If it happens to be a bottleneck when we 
use UDAFs later, directly manipulating ArrayBuffers would be more efficient 
than pattern-matching on lists anyway. Rerunning the synthetic benchmark with 
the suggested changes did not yield runtime changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to