Github user scwf commented on a diff in the pull request:
https://github.com/apache/spark/pull/5604#discussion_r29730645
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -529,6 +532,203 @@ class Analyzer(
makeGeneratorOutput(p.generator, p.generatorOutput), p.child)
}
}
+
+ /**
+ * Extracts [[WindowExpression]]s from the projectList of a [[Project]]
operator and
+ * aggregateExpressions of an [[Aggregate]] operator and creates
individual [[Window]]
+ * operators for every distinct [[WindowSpecDefinition]].
+ *
+ * This rule handles three cases:
+ * - A [[Project]] having [[WindowExpression]]s in its projectList;
+ * - An [[Aggregate]] having [[WindowExpression]]s in its
aggregateExpressions.
+ * - An [[Filter]]->[[Aggregate]] pattern representing GROUP BY with a
HAVING
+ * clause and the [[Aggregate]] has [[WindowExpression]]s in its
aggregateExpressions.
+ * Note: If there is a GROUP BY clause in the query, aggregations and
corresponding
+ * filters (expressions in the HAVING clause) should be evaluated before
any
+ * [[WindowExpression]]. If a query has SELECT DISTINCT, the DISTINCT
part should be
+ * evaluated after all [[WindowExpression]]s.
+ *
+ * For every case, the transformation works as follows:
+ * 1. For a list of [[Expression]]s (a projectList or an
aggregateExpressions), partitions
+ * it two lists of [[Expression]]s, one for all [[WindowExpression]]s
and another for
+ * all regular expressions.
+ * 2. For all [[WindowExpression]]s, groups them based on their
[[WindowSpecDefinition]]s.
+ * 3. For every distinct [[WindowSpecDefinition]], creates a [[Window]]
operator and inserts
+ * it into the plan tree.
+ */
+ object ExtractWindowExpressions extends Rule[LogicalPlan] {
+ def hasWindowFunction(projectList: Seq[NamedExpression]): Boolean =
+ projectList.exists(hasWindowFunction)
+
+ def hasWindowFunction(expr: NamedExpression): Boolean = {
+ expr.find {
+ case window: WindowExpression => true
+ case _ => false
+ }.isDefined
+ }
+
+ /**
+ * From a Seq of [[NamedExpression]]s, extract window expressions and
+ * other regular expressions.
+ */
+ def extract(
+ expressions: Seq[NamedExpression]): (Seq[NamedExpression],
Seq[NamedExpression]) = {
+ // First, we simple partition the input expressions to two part, one
having
+ // WindowExpressions and another one without WindowExpressions.
+ val (windowExpressions, regularExpressions) =
expressions.partition(hasWindowFunction)
+
+ // Then, we need to extract those regular expressions used in the
WindowExpression.
+ // For example, when we have col1 - Sum(col2 + col3) OVER (PARTITION
BY col4 ORDER BY col5),
+ // we need to make sure that col1 to col5 are all projected from the
child of the Window
+ // operator.
+ val extractedExprBuffer = new ArrayBuffer[NamedExpression]()
+ def extractExpr(expr: Expression): Expression = expr match {
+ case ne: NamedExpression =>
+ // If a named expression is not in regularExpressions, add
extract it and replace it
+ // with an AttributeReference.
+ val missingExpr =
+ AttributeSet(Seq(expr)) -- (regularExpressions ++
extractedExprBuffer)
+ if (missingExpr.nonEmpty) {
+ extractedExprBuffer += ne
+ }
+ ne.toAttribute
+ case e: Expression if e.foldable =>
+ e // No need to create an attribute reference if it will be
evaluated as a Literal.
+ case e: Expression =>
+ // For other expressions, we extract it and replace it with an
AttributeReference (with
+ // an interal column name, e.g. "_w0").
+ val withName = Alias(e, s"_w${extractedExprBuffer.length}")()
+ extractedExprBuffer += withName
+ withName.toAttribute
+ }
+
+ // Now, we extract expressions from windowExpressions by using
extractExpr.
+ val newWindowExpressions = windowExpressions.map {
+ _.transform {
+ // Extracts children expressions of a WindowFunction (input
parameters of
+ // a WindowFunction).
+ case wf : WindowFunction =>
+ val newChildren = wf.children.map(extractExpr(_))
+ wf.withNewChildren(newChildren)
+
+ // Extracts expressions from the partition spec and order spec.
+ case wsc @ WindowSpecDefinition(partitionSpec, orderSpec, _) =>
+ val newPartitionSpec = partitionSpec.map(extractExpr(_))
+ val newOrderSpec = orderSpec.map { so =>
+ val newChild = extractExpr(so.child)
+ so.copy(child = newChild)
+ }
+ wsc.copy(partitionSpec = newPartitionSpec, orderSpec =
newOrderSpec)
+
+ // Extracts AggregateExpression. For example, for SUM(x) -
Sum(y) OVER (...),
+ // we need to extract SUM(x).
+ case agg: AggregateExpression =>
+ val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
+ extractedExprBuffer += withName
+ withName.toAttribute
+ }.asInstanceOf[NamedExpression]
+ }
+
+ (newWindowExpressions, regularExpressions ++ extractedExprBuffer)
+ }
+
+ /**
+ * Adds operators for Window Expressions. Every Window operator
handles a single Window Spec.
+ */
+ def addWindow(windowExpressions: Seq[NamedExpression], child:
LogicalPlan): LogicalPlan = {
+ // First, we group window expressions based on their Window Spec.
+ val groupedWindowExpression = windowExpressions.groupBy { expr =>
+ val windowExpression = expr.find {
+ case window: WindowExpression => true
+ case other => false
+ }.map(_.asInstanceOf[WindowExpression].windowSpec)
+ windowExpression.getOrElse(
+ failAnalysis(s"$windowExpressions does not have any
WindowExpression."))
+ }.toSeq
+
+ // For every Window Spec, we add a Window operator and set
currentChild as the child of it.
+ var currentChild = child
+ var i = 0
+ while (i < groupedWindowExpression.size) {
+ val (windowSpec, windowExpressions) = groupedWindowExpression(i)
+ // Set currentChild to the newly created Window operator.
+ currentChild = Window(currentChild.output, windowExpressions,
windowSpec, currentChild)
+
+ // Move to next WindowExpression.
+ i += 1
+ }
+
+ // We return the top operator.
+ currentChild
+ }
+
+ // We have to use transformDown at here to make sure the rule of
+ // "Aggregate with Having clause" will be triggered.
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+ // Lookup WindowSpecDefinitions. This rule works with unresolved
children.
+ case WithWindowDefinition(windowDefinitions, child) =>
+ child.transform {
+ case plan => plan.transformExpressions {
+ case UnresolvedWindowExpression(c,
WindowSpecReference(windowName)) =>
+ val errorMessage =
+ s"Window specification $windowName is not defined in the
WINDOW clause."
+ val windowSpecDefinition =
+ windowDefinitions
+ .get(windowName)
+ .getOrElse(failAnalysis(errorMessage))
+ WindowExpression(c, windowSpecDefinition)
+ }
+ }
+
+ // Aggregate with Having clause. This rule works with an unresolved
Aggregate because
+ // a resolved Aggregate will not have Window Functions.
+ case f @ Filter(condition, a @ Aggregate(groupingExprs,
aggregateExprs, child))
+ if child.resolved &&
+ hasWindowFunction(aggregateExprs) &&
+ !a.expressions.exists(!_.resolved) =>
--- End diff --
minor: a.expressions.forall(_.resolved) more readable
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]