[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436311072 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -510,10 +510,12 @@ class Analyzer( // collect all the found AggregateExpression, so we can check an expression is part of // any AggregateExpression or not. val aggsBuffer = ArrayBuffer[Expression]() + Review comment: nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436311072 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -510,10 +510,12 @@ class Analyzer( // collect all the found AggregateExpression, so we can check an expression is part of // any AggregateExpression or not. val aggsBuffer = ArrayBuffer[Expression]() + Review comment: nit: plz revert the unencessary changes. ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { Review comment: Could you please make the title more correct? I think we don't need the word `with CUBE`. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] +resolved.transformExpressions { + case a @ Alias(struct: GetStructField, _) => +if (structFieldMap.contains(struct.sql)) { + val exprId = structFieldMap.getOrElse(struct.sql, a).exprId + Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata) +} else { + structFieldMap.put(struct.sql, a) + a +} + case e => e +} + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) } +def needResolveStructField(plan: LogicalPlan): Boolean = { + plan match { +case UnresolvedHaving(havingCondition, a: Aggregate) + if containSameStructFields(a.groupingExpressions.flatMap(_.references), +a.aggregateExpressions.flatMap(_.references), +Some(havingCondition.references.toSeq)) => true +case Aggregate(groupingExpressions, aggregateExpressions, _) + if containSameStructFields(groupingExpressions.flatMap(_.references), +aggregateExpressions.flatMap(_.references)) => true +case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations) + if containSameStructFields(groupByExprs.flatMap(_.references), +aggregations.flatMap(_.references), +Some(selectedGroupByExprs.flatMap(_.flatMap(_.references => true +case _ => false + } +} + +def containSameStructFields( Review comment: It is not enough just to check if both sides (`grpExprs` and `aggExprs`) have struct fields here? We need to confirm the identity by using unresolved attributes? ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] Review comment: `mutable.HashMap` -> `mutable.Map` ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] +resolved.transformExpressions { + case a @ Alias(struct: GetStructField, _) => +if (structFieldMap.contains(struct.sql)) { + val exprId = structFieldMap.getOrElse(struct.sql, a).exprId + Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata) +} else { + structFieldMap.put(struct.sql, a) + a +} + case e => e +} + case q: LogicalPlan =>
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r435029537 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -506,31 +506,55 @@ class Analyzer( aggregations: Seq[NamedExpression], groupByAliases: Seq[Alias], groupingAttrs: Seq[Expression], -gid: Attribute): Seq[NamedExpression] = aggregations.map { - // collect all the found AggregateExpression, so we can check an expression is part of - // any AggregateExpression or not. - val aggsBuffer = ArrayBuffer[Expression]() - // Returns whether the expression belongs to any expressions in `aggsBuffer` or not. - def isPartOfAggregation(e: Expression): Boolean = { -aggsBuffer.exists(a => a.find(_ eq e).isDefined) +gid: Attribute): Seq[NamedExpression] = { + val resolvedGroupByAliases = groupByAliases.map(_.transformDown { Review comment: > The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu Yea, all the cases have the same issue; ``` scala> spark.range(1).selectExpr("'x' AS a", "1 AS b", "array(named_struct('row_id', 1, 'json_string', 'y')) AS c").createOrReplaceTempView("t") // ROLLUP scala> sql(""" | select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b) | from t | LATERAL VIEW explode(c) x AS each | group by a, get_json_object(each.json_string,'$.iType') | with rollup | """).show() org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L], [a#17, coalesce(get_json_object(each#9.json_string, $.iType), -127) AS iType#8, sum(cast(b#3 as bigint)) AS sum(b)#13L] +- Expand [ArrayBuffer(a#2, b#3, c#4, each#9, a#14, get_json_object(each#9.json_string AS json_string#10, $.iType)#15, 0), ArrayBuffer(a#2, b#3, c#4, each#9, a#14, null, 1), ArrayBuffer(a#2, b#3, c#4, each#9, null, null, 3)], [a#2, b#3, c#4, each#9, a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L] +- Project [a#2, b#3, c#4, each#9, a#2 AS a#14, get_json_object(each#9.json_string, $.iType) AS get_json_object(each#9.json_string AS json_string#10, $.iType)#15] +- Generate explode(c#4), false, x, [each#9] +- SubqueryAlias t +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4] +- Range (0, 1, step=1, splits=Some(4)) // GROUPING SETS scala> sql(""" | select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b) | from t | LATERAL VIEW explode(c) x AS each | group by grouping sets((a, get_json_object(each.json_string,'$.iType'))) | """).show() org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L], [a#28, coalesce(get_json_object(each#20.json_string, $.iType), -127) AS iType#19, sum(cast(b#3 as bigint)) AS sum(b)#24L] +- Expand [ArrayBuffer(a#2, b#3, c#4, each#20, a#25, get_json_object(each#20.json_string AS json_string#21, $.iType)#26, 0)], [a#2, b#3, c#4, each#20, a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L] +- Project [a#2, b#3, c#4, each#20, a#2 AS a#25, get_json_object(each#20.json_string, $.iType) AS get_json_object(each#20.json_string AS json_string#21, $.iType)#26] +- Generate explode(c#4), false, x, [each#20] +- SubqueryAlias t +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4] +- Range (0, 1, step=1, splits=Some(4)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434372717 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(df4.schema.head.name === "randn(1)") checkIfSeedExistsInExplain(df2) } + + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t1") { + sql( +"""create table t1( + |a string, + |b int, + |c array>, + |d array>, + |e array>) + |using orc""".stripMargin) + + checkAnswer( +sql( + """ +|select a, each.json_string, sum(b) +|from t1 +|LATERAL VIEW explode(c) x AS each +|group by a, each.json_string +|with cube +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|select a, get_json_object(each.json_string, '$.i'), sum(b) +|from t1 +|LATERAL VIEW explode(c) x AS each +|group by a, get_json_object(each.json_string, '$.i') +|with cube +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|select a, each.json_string as json_string, sum(b) +|from t1 +|LATERAL VIEW explode(c) x AS each +|group by a, each.json_string +|with cube +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|select a, each.json_string as js, sum(b) +|from t1 +|LATERAL VIEW explode(c) x AS each +|group by a, each.json_string +|with cube Review comment: Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434371772 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(df4.schema.head.name === "randn(1)") checkIfSeedExistsInExplain(df2) } + + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t1") { + sql( +"""create table t1( + |a string, + |b int, + |c array>, + |d array>, + |e array>) + |using orc""".stripMargin) + + checkAnswer( +sql( + """ +|select a, each.json_string, sum(b) Review comment: nit: Could you use uppercases for the SQL keywords where possible? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434371366 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(df4.schema.head.name === "randn(1)") checkIfSeedExistsInExplain(df2) } + + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t1") { + sql( +"""create table t1( + |a string, + |b int, + |c array>, + |d array>, + |e array>) + |using orc""".stripMargin) Review comment: Please use a temp view for test performance? Also, its better to add some rows for answer checks in this test table instead of the current null table. ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(df4.schema.head.name === "randn(1)") checkIfSeedExistsInExplain(df2) } + + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t1") { + sql( +"""create table t1( + |a string, + |b int, + |c array>, + |d array>, + |e array>) + |using orc""".stripMargin) Review comment: Please use a temp view for test performance. Also, its better to add some rows for answer checks in this test table instead of the current null table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434369978 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(df4.schema.head.name === "randn(1)") checkIfSeedExistsInExplain(df2) } + + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t1") { Review comment: nit: `t1` -> `t` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434366090 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -506,31 +506,55 @@ class Analyzer( aggregations: Seq[NamedExpression], groupByAliases: Seq[Alias], groupingAttrs: Seq[Expression], -gid: Attribute): Seq[NamedExpression] = aggregations.map { - // collect all the found AggregateExpression, so we can check an expression is part of - // any AggregateExpression or not. - val aggsBuffer = ArrayBuffer[Expression]() - // Returns whether the expression belongs to any expressions in `aggsBuffer` or not. - def isPartOfAggregation(e: Expression): Boolean = { -aggsBuffer.exists(a => a.find(_ eq e).isDefined) +gid: Attribute): Seq[NamedExpression] = { + val resolvedGroupByAliases = groupByAliases.map(_.transformDown { Review comment: Probably, we should not fix this issue in `ResolveGroupingAnalytics`, but in `ResolveRefences` just like this; ``` object ResolveReferences extends Rule[LogicalPlan] { ... def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case p: LogicalPlan if !p.childrenResolved => p case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field => val newAgg = resolve expressions so that they have the same exprIds newAgg ``` A root cause seems to be that `ResolveReferences` assigns different exprIds to `each#30.json_string AS json_string`s (`#31` vs `#32`); ``` 20/06/03 16:22:47 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences === !'Aggregate [cube('a, 'get_json_object('each.json_string, $.iType))], ['a, 'coalesce('get_json_object('each.json_string, $.iType), -127) AS iType#29, unresolvedalias('sum('b), None)] +- Generate explode(c#4), false, x, [each#30] +- SubqueryAlias t +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4] +- Range (0, 1, step=1, splits=Some(4)) 'Aggregate [cube(a#2, 'get_json_object(each#30.json_string AS json_string#31, $.iType))], [a#2, 'coalesce('get_json_object(each#30.json_string AS json_string#32, $.iType), -127) AS iType#29, unresolvedalias('sum(b#3), None)] +- Generate explode(c#4), false, x, [each#30] +- SubqueryAlias t +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4] +- Range (0, 1, step=1, splits=Some(4)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434366334 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -506,31 +506,55 @@ class Analyzer( aggregations: Seq[NamedExpression], groupByAliases: Seq[Alias], groupingAttrs: Seq[Expression], -gid: Attribute): Seq[NamedExpression] = aggregations.map { - // collect all the found AggregateExpression, so we can check an expression is part of - // any AggregateExpression or not. - val aggsBuffer = ArrayBuffer[Expression]() - // Returns whether the expression belongs to any expressions in `aggsBuffer` or not. - def isPartOfAggregation(e: Expression): Boolean = { -aggsBuffer.exists(a => a.find(_ eq e).isDefined) +gid: Attribute): Seq[NamedExpression] = { + val resolvedGroupByAliases = groupByAliases.map(_.transformDown { Review comment: cc: @cloud-fan @viirya This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #28490: [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE
maropu commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r434366090 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -506,31 +506,55 @@ class Analyzer( aggregations: Seq[NamedExpression], groupByAliases: Seq[Alias], groupingAttrs: Seq[Expression], -gid: Attribute): Seq[NamedExpression] = aggregations.map { - // collect all the found AggregateExpression, so we can check an expression is part of - // any AggregateExpression or not. - val aggsBuffer = ArrayBuffer[Expression]() - // Returns whether the expression belongs to any expressions in `aggsBuffer` or not. - def isPartOfAggregation(e: Expression): Boolean = { -aggsBuffer.exists(a => a.find(_ eq e).isDefined) +gid: Attribute): Seq[NamedExpression] = { + val resolvedGroupByAliases = groupByAliases.map(_.transformDown { Review comment: Probably, we should not fix this issue in `ResolveGroupingAnalytics`, but in `ResolveRefences` just like this; ``` object ResolveReferences extends Rule[LogicalPlan] { ... def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case p: LogicalPlan if !p.childrenResolved => p case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field => val newAgg = resolve expressions so that they have the same exprIds newAgg ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org