maropu commented on a change in pull request #28490:
URL: https://github.com/apache/spark/pull/28490#discussion_r436311072



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -510,10 +510,12 @@ class Analyzer(
       // collect all the found AggregateExpression, so we can check an 
expression is part of
       // any AggregateExpression or not.
       val aggsBuffer = ArrayBuffer[Expression]()
+

Review comment:
       nit: plz revert the unencessary changes.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
##########
@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     checkIfSeedExistsInExplain(df2)
   }
 
+  test("SPARK-31670: Struct Field in groupByExpr with CUBE") {

Review comment:
       Could you please make the title more correct? I think we don't need the 
word `with CUBE`.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1479,11 +1486,70 @@ class Analyzer(
       // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
       case h: UnresolvedHaving => h
 
+      case p: LogicalPlan if needResolveStructField(p) =>
+        logTrace(s"Attempting to resolve 
${p.simpleString(SQLConf.get.maxToStringFields)}")
+        val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
+        val structFieldMap = new mutable.HashMap[String, Alias]
+        resolved.transformExpressions {
+          case a @ Alias(struct: GetStructField, _) =>
+            if (structFieldMap.contains(struct.sql)) {
+              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
+              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
+            } else {
+              structFieldMap.put(struct.sql, a)
+              a
+            }
+          case e => e
+        }
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
+    def needResolveStructField(plan: LogicalPlan): Boolean = {
+      plan match {
+        case UnresolvedHaving(havingCondition, a: Aggregate)
+          if 
containSameStructFields(a.groupingExpressions.flatMap(_.references),
+            a.aggregateExpressions.flatMap(_.references),
+            Some(havingCondition.references.toSeq)) => true
+        case Aggregate(groupingExpressions, aggregateExpressions, _)
+          if containSameStructFields(groupingExpressions.flatMap(_.references),
+            aggregateExpressions.flatMap(_.references)) => true
+        case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations)
+          if containSameStructFields(groupByExprs.flatMap(_.references),
+            aggregations.flatMap(_.references),
+            Some(selectedGroupByExprs.flatMap(_.flatMap(_.references)))) => 
true
+        case _ => false
+      }
+    }
+
+    def containSameStructFields(

Review comment:
       It is not enough just to check if both sides (`grpExprs` and `aggExprs`) 
have struct fields here? We need to confirm the identity by using unresolved 
attributes?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1479,11 +1486,70 @@ class Analyzer(
       // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
       case h: UnresolvedHaving => h
 
+      case p: LogicalPlan if needResolveStructField(p) =>
+        logTrace(s"Attempting to resolve 
${p.simpleString(SQLConf.get.maxToStringFields)}")
+        val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
+        val structFieldMap = new mutable.HashMap[String, Alias]

Review comment:
       `mutable.HashMap` -> `mutable.Map`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1479,11 +1486,70 @@ class Analyzer(
       // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
       case h: UnresolvedHaving => h
 
+      case p: LogicalPlan if needResolveStructField(p) =>
+        logTrace(s"Attempting to resolve 
${p.simpleString(SQLConf.get.maxToStringFields)}")
+        val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
+        val structFieldMap = new mutable.HashMap[String, Alias]
+        resolved.transformExpressions {
+          case a @ Alias(struct: GetStructField, _) =>
+            if (structFieldMap.contains(struct.sql)) {
+              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
+              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
+            } else {
+              structFieldMap.put(struct.sql, a)
+              a
+            }
+          case e => e
+        }
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
+    def needResolveStructField(plan: LogicalPlan): Boolean = {
+      plan match {
+        case UnresolvedHaving(havingCondition, a: Aggregate)
+          if 
containSameStructFields(a.groupingExpressions.flatMap(_.references),
+            a.aggregateExpressions.flatMap(_.references),
+            Some(havingCondition.references.toSeq)) => true
+        case Aggregate(groupingExpressions, aggregateExpressions, _)
+          if containSameStructFields(groupingExpressions.flatMap(_.references),
+            aggregateExpressions.flatMap(_.references)) => true
+        case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations)
+          if containSameStructFields(groupByExprs.flatMap(_.references),
+            aggregations.flatMap(_.references),
+            Some(selectedGroupByExprs.flatMap(_.flatMap(_.references)))) => 
true
+        case _ => false
+      }
+    }
+
+    def containSameStructFields(

Review comment:
       `private`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1479,11 +1486,70 @@ class Analyzer(
       // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
       case h: UnresolvedHaving => h
 
+      case p: LogicalPlan if needResolveStructField(p) =>
+        logTrace(s"Attempting to resolve 
${p.simpleString(SQLConf.get.maxToStringFields)}")
+        val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
+        val structFieldMap = new mutable.HashMap[String, Alias]
+        resolved.transformExpressions {
+          case a @ Alias(struct: GetStructField, _) =>
+            if (structFieldMap.contains(struct.sql)) {
+              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
+              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
+            } else {
+              structFieldMap.put(struct.sql, a)
+              a
+            }
+          case e => e
+        }
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
+    def needResolveStructField(plan: LogicalPlan): Boolean = {

Review comment:
       `private`

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
##########
@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     checkIfSeedExistsInExplain(df2)
   }
 
+  test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
+    withTable("t") {
+      sql(
+        """CREATE TABLE t(
+          |a STRING,
+          |b INT,
+          |c ARRAY<STRUCT<row_id:INT,json_string:STRING>>,
+          |d ARRAY<ARRAY<STRING>>,
+          |e ARRAY<MAP<STRING, INT>>)
+          |USING ORC""".stripMargin)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) x AS each
+            |GROUP BY a, each.json_string
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, get_json_object(each.json_string, '$.i'), SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) X AS each
+            |GROUP BY a, get_json_object(each.json_string, '$.i')
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string AS json_string, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) x AS each

Review comment:
       btw, we must need `lateral view` to reproduce this issue? I mean, this 
issue cannot happen without `lateral view`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
##########
@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     checkIfSeedExistsInExplain(df2)
   }
 
+  test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
+    withTable("t") {
+      sql(
+        """CREATE TABLE t(
+          |a STRING,
+          |b INT,
+          |c ARRAY<STRUCT<row_id:INT,json_string:STRING>>,
+          |d ARRAY<ARRAY<STRING>>,
+          |e ARRAY<MAP<STRING, INT>>)
+          |USING ORC""".stripMargin)

Review comment:
       See: https://github.com/apache/spark/pull/28490#discussion_r434371366

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
##########
@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     checkIfSeedExistsInExplain(df2)
   }
 
+  test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
+    withTable("t") {
+      sql(
+        """CREATE TABLE t(
+          |a STRING,
+          |b INT,
+          |c ARRAY<STRUCT<row_id:INT,json_string:STRING>>,
+          |d ARRAY<ARRAY<STRING>>,
+          |e ARRAY<MAP<STRING, INT>>)
+          |USING ORC""".stripMargin)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) x AS each
+            |GROUP BY a, each.json_string
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, get_json_object(each.json_string, '$.i'), SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) X AS each
+            |GROUP BY a, get_json_object(each.json_string, '$.i')
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string AS json_string, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) x AS each
+            |GROUP BY a, each.json_string
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string as js, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) X AS each
+            |GROUP BY a, each.json_string
+            |WITH CUBE
+            |""".stripMargin), Nil)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string as js, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) X AS each
+            |GROUP BY a, each.json_string
+            |WITH ROLLUP
+            |""".stripMargin), Nil)
+
+      sql(
+        """
+          |SELECT a, each.json_string, SUM(b)
+          |FROM t
+          |LATERAL VIEW EXPLODE(c) X AS each
+          |GROUP BY a, each.json_string
+          |GROUPING sets((a),(a, each.json_string))
+          |""".stripMargin).explain(true)
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, each.json_string, SUM(b)
+            |FROM t
+            |LATERAL VIEW EXPLODE(c) X AS each
+            |GROUP BY a, each.json_string
+            |GROUPING sets((a),(a, each.json_string))
+            |""".stripMargin), Nil)

Review comment:
       Could you add tests having queries with `HAVING` clauses?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1479,11 +1486,70 @@ class Analyzer(
       // Skip the having clause here, this will be handled in 
ResolveAggregateFunctions.
       case h: UnresolvedHaving => h
 
+      case p: LogicalPlan if needResolveStructField(p) =>
+        logTrace(s"Attempting to resolve 
${p.simpleString(SQLConf.get.maxToStringFields)}")
+        val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
+        val structFieldMap = new mutable.HashMap[String, Alias]
+        resolved.transformExpressions {
+          case a @ Alias(struct: GetStructField, _) =>
+            if (structFieldMap.contains(struct.sql)) {
+              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
+              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
+            } else {
+              structFieldMap.put(struct.sql, a)
+              a
+            }
+          case e => e
+        }
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
+    def needResolveStructField(plan: LogicalPlan): Boolean = {
+      plan match {
+        case UnresolvedHaving(havingCondition, a: Aggregate)
+          if 
containSameStructFields(a.groupingExpressions.flatMap(_.references),
+            a.aggregateExpressions.flatMap(_.references),
+            Some(havingCondition.references.toSeq)) => true
+        case Aggregate(groupingExpressions, aggregateExpressions, _)
+          if containSameStructFields(groupingExpressions.flatMap(_.references),
+            aggregateExpressions.flatMap(_.references)) => true
+        case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations)
+          if containSameStructFields(groupByExprs.flatMap(_.references),
+            aggregations.flatMap(_.references),
+            Some(selectedGroupByExprs.flatMap(_.flatMap(_.references)))) => 
true
+        case _ => false
+      }
+    }
+
+    def containSameStructFields(
+        grpExprs: Seq[Attribute],

Review comment:
       nit: `grpExprs` -> `groupExprs` for consistency.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to