WeichenXu123 commented on a change in pull request #25512: [SPARK-28782][SQL] 
Generator support in aggregate expressions
URL: https://github.com/apache/spark/pull/25512#discussion_r319955689
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -2018,6 +2018,62 @@ class Analyzer(
         throw new AnalysisException("Only one generator allowed per select 
clause but found " +
           generators.size + ": " + generators.map(toPrettySQL).mkString(", "))
 
+      case Aggregate(_, aggList, _) if aggList.exists(hasNestedGenerator) =>
+        val nestedGenerator = aggList.find(hasNestedGenerator).get
+        throw new AnalysisException("Generators are not supported when it's 
nested in " +
+          "expressions, but got: " + toPrettySQL(trimAlias(nestedGenerator)))
+
+      case Aggregate(_, aggList, _) if aggList.count(hasGenerator) > 1 =>
+        val generators = aggList.filter(hasGenerator).map(trimAlias)
+        throw new AnalysisException("Only one generator allowed per aggregate 
clause but found " +
+          generators.size + ": " + generators.map(toPrettySQL).mkString(", "))
+
+      case agg @ Aggregate(groupList, aggList, child) if aggList.forall {
+        case AliasedGenerator(generator, _, _) => generator.childrenResolved
+        case other => other.resolved
+      } =>
+        // Holds the resolved generator, if one exists in the project list.
+        var generatorVisited = false
+
+        val projectExprs = Array.ofDim[NamedExpression](aggList.length)
+        val newAggList = aggList
+          
.map(CleanupAliases.trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
+          .zipWithIndex
+          .flatMap {
+            case (AliasedGenerator(generator, names, outer), idx) =>
+              // It's a sanity check, this should not happen as the previous 
case will throw
+              // exception earlier.
+              assert(!generatorVisited, "More than one generator found in 
aggregate.")
+              generatorVisited = true
+
+              val newGenChildren: Seq[Expression] = 
generator.children.zipWithIndex.map {
+                case (e, idx) => if (e.foldable) e else Alias(e, 
s"_gen_input_${idx}")()
+              }
+              val newGenerator = {
+                val g = generator.withNewChildren(newGenChildren.map { e =>
+                  if (e.foldable) e else e.asInstanceOf[Alias].toAttribute
+                }).asInstanceOf[Generator]
+                if (outer) GeneratorOuter(g) else g
+              }
+              val newAliasedGenerator = if (names.length == 1) {
+                Alias(newGenerator, names(0))()
+              } else {
+                MultiAlias(newGenerator, names)
+              }
+              projectExprs(idx) = newAliasedGenerator
+              
newGenChildren.filter(!_.foldable).asInstanceOf[Seq[NamedExpression]]
+            case (other, idx) =>
+              projectExprs(idx) = other.toAttribute
+              other :: Nil
+          }
+
+        if (generatorVisited) {
+          val newAgg = Aggregate(groupList, newAggList, child)
+          Project(projectExprs.toList, newAgg)
+        } else {
+          agg
+        }
+
       case p @ Project(projectList, child) =>
 
 Review comment:
   I prefer current switch case order.
   The first four switch case only do some check so put them in front.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to