cloud-fan commented on code in PR #35041:
URL: https://github.com/apache/spark/pull/35041#discussion_r849613329
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##########
@@ -325,3 +339,133 @@ case class Percentile(
frequencyExpression = newThird
)
}
+
+/**
+ * Return a percentile value based on a continuous distribution of
+ * numeric or ansi interval column at the given percentage (specified in ORDER
BY clause).
+ * The value of percentage must be between 0.0 and 1.0.
+ */
+case class PercentileCont(left: Expression, right: Expression)
+ extends AggregateFunction
+ with RuntimeReplaceableAggregate
+ with ImplicitCastInputTypes
+ with BinaryLike[Expression] {
+ private lazy val percentile = new Percentile(left, right)
+ override def replacement: Expression = percentile
+ override def nodeName: String = "percentile_cont"
+ override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes
+ override protected def withNewChildrenInternal(
+ newLeft: Expression, newRight: Expression): PercentileCont =
+ this.copy(left = newLeft, right = newRight)
+}
+
+/**
+ * The Percentile aggregate function returns the percentile(s) based on a
discrete distribution of
+ * numeric column `expr` at the given percentage(s) with value range in [0.0,
1.0].
+ *
+ * Because the number of elements and their partial order cannot be determined
in advance.
+ * Therefore we have to store all the elements in memory, and so notice that
too many elements can
+ * cause GC paused and eventually OutOfMemory Errors.
+ */
+case class PercentileDisc private(
+ child: Expression,
+ percentageExpression: Expression,
+ frequencyExpression: Expression,
+ reverse: Boolean = false,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0) extends PercentileBase {
+
+ def this(child: Expression, percentageExpression: Expression) = {
+ this(child, percentageExpression, Literal(1L))
+ }
+
+ def this(child: Expression, percentageExpression: Expression, reverse:
Boolean) = {
+ this(child, percentageExpression, Literal(1L), reverse)
+ }
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val defaultCheck = super.checkInputDataTypes()
+ if (defaultCheck.isFailure) {
+ defaultCheck
+ } else {
+ assert(frequencyExpression == Literal(1L))
+ TypeCheckSuccess
+ }
+ }
+
+ override def prettyName: String = "percentile_disc"
+
+ /**
+ * This function has been based upon similar function from H2
+ * `org.h2.expression.aggregate.Percentile.getValue`.
+ */
+ override protected def getPercentiles(sortedCounts: Seq[(AnyRef, Long)]):
Seq[Double] = {
+ val passedSortedCounts = if (reverse) {
+ sortedCounts.reverse
+ } else {
+ sortedCounts
+ }
+ val maxPosition = passedSortedCounts.last._2 - 1
+
+ percentages.map { percentile =>
+ getPercentile(passedSortedCounts, maxPosition, percentile)
+ }
+ }
+
+ private def getPercentile(
+ sortedCounts: Seq[(AnyRef, Long)], maxPosition: Long, percentile:
Double): Double = {
+ val position = maxPosition * percentile
+ var lower = position.floor.toLong
+ val factor = position - lower
+ var higher = 0L
+ if (factor.signum == 0) {
+ higher = lower
+ } else {
+ higher = lower + 1
+ if (factor.compareTo(0.5) > 0) {
+ lower = higher
+ } else {
+ higher = lower
+ }
+ }
+
+ // Build the mapping between the index of the input data and the index of
the aggregated data
+ var accumulator = 0L
+ val idxMap = sortedCounts.zipWithIndex.map { case (c, i) =>
+ accumulator = accumulator + c._2
+ i -> accumulator
+ }.toMap
+
+ // Use binary search to find the lower position.
+ val countsArray = sortedCounts.map(_._2).toArray[Long]
+ var lowerIndex = binarySearchCount(countsArray, 0, sortedCounts.size,
lower)
Review Comment:
Does it make sense to call `binarySearchCount` without accumulated counts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]