Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/11555#issuecomment-193135603
@cloud-fan The issue is much more complex in my implementation. As you saw
in the JIRA, I originally want to add extra subqueryAlias between each Window.
However, I hit a couple of problem caused by `subqueryAlias`. Thus, I finally
decided to recover the original SQL statement. Below is my code draft without
code cleans.
```scala
private def getAllWindowExprs(
plan: Window,
windowExprs: ArrayBuffer[NamedExpression]): (LogicalPlan,
ArrayBuffer[NamedExpression]) = {
plan.child match {
case w: Window =>
getAllWindowExprs(plan.child.asInstanceOf[Window], windowExprs ++
plan.windowExpressions)
case _ => (plan.child, windowExprs ++ plan.windowExpressions)
}
}
// Replace the attributes of aliased expressions in windows expressions
// by the original expressions in Project or Aggregate
private def replaceAliasedByExpr(
projectList: Seq[NamedExpression],
windowExprs: Seq[NamedExpression]): Seq[Expression] = {
val aliasMap = AttributeMap(projectList.collect {
case a: Alias => (a.toAttribute, a.child)
})
windowExprs.map { case expr =>
expr.transformDown {
case ar: AttributeReference if aliasMap.contains(ar) => aliasMap(ar)
}
}
}
private def buildProjectListForWindow(plan: Window): (String, String,
String, LogicalPlan) = {
// get all the windowExpressions from all the adjacent Window
val (child, windowExpressions) = getAllWindowExprs(plan,
ArrayBuffer.empty[NamedExpression])
child match {
case p: Project =>
val newWindowExpr = replaceAliasedByExpr(p.projectList,
windowExpressions)
((p.projectList ++ newWindowExpr).map(_.sql).mkString(", "), "",
"", p.child)
case _: Aggregate | _ @ Filter(_, _: Aggregate) =>
val agg: Aggregate = child match {
case a: Aggregate => a
case Filter(_, a: Aggregate) => a
}
val newWindowExpr = replaceAliasedByExpr(agg.aggregateExpressions,
windowExpressions)
val groupingSQL = agg.groupingExpressions.map(_.sql).mkString(", ")
val havingSQL = child match {
case a: Aggregate => ""
case Filter(condition, a: Aggregate) => "HAVING " + condition.sql
}
((agg.aggregateExpressions ++ newWindowExpr)
.map(_.sql).mkString(", "),
groupingSQL,
havingSQL,
agg.child)
}
}
private def windowToSQL(plan: Window): String = {
val (selectList, groupingSQL, havingSQL, nextPlan) =
buildProjectListForWindow(plan)
build(
"SELECT",
selectList,
if (nextPlan == OneRowRelation) "" else "FROM",
toSQL(nextPlan),
if (groupingSQL.isEmpty) "" else "GROUP BY",
groupingSQL,
havingSQL
)
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]