[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480974176 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1326,24 +1326,49 @@ class Analyzer( * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. */ -private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = { - if (e.resolved) return e - e match { -case f: LambdaFunction if !f.bound => f -case u @ UnresolvedAttribute(nameParts) => - // Leave unchanged if resolution fails. Hopefully will be resolved next round. - val result = -withPosition(u) { - q.resolveChildren(nameParts, resolver) -.orElse(resolveLiteralFunction(nameParts, u, q)) -.getOrElse(u) +private def resolveExpressionTopDown( +e: Expression, +q: LogicalPlan, +trimAlias: Boolean = false): Expression = { + + // Explain for param trimAlias and isTopLevel: Review comment: > The previous parameter doc of `resolveExpressionTopDown` is good. Why we turn it to comment here? Since we don't have isTopLevel param in `resolveExpressionTopDow`. emm how about ``` * @param e the expression need to be resolved. * @param q the LogicalPlan whose children are used to resolve expression's attribute. * @param trimAlias whether need to trim alias of Struct field. When true, we will trim * Struct field alias. When isTopLevel = true, we won't trim top-level * Struct field alias. * @return resolved Expression. */ private def resolveExpressionTopDown( e: Expression, q: LogicalPlan, trimAlias: Boolean = false): Expression = { // Explain for param trimAlias and isTopLevel: // 1. trimAlias = false: //We won't trim Struct Field alias. // 2. trimAlias = true && isTopLevel = false: //We will trim all Struct field alias. // 3. trimAlias = true && isTopLevel = true //Trim unnecessary alias of `GetStructField`. Note that, we cannot trim the alias of //top-level `GetStructField`, as we should resolve `UnresolvedAttribute` to a named //expression. The caller side can trim the alias of top-level `GetStructField` if needed. def innerResolve( e: Expression, isTopLevel: Boolean): Expression = { if (e.resolved) return e ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480873342 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,36 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { Review comment: > The comment is lost? Oh make a mistake, add back This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480858783 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1325,24 +1325,43 @@ class Analyzer( * * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. + * + * @param e the expression need to be resolved. + * @param q the LogicalPlan whose children are used to resolve expression's attribute. + * @param trimAlias whether need to trim alias of Struct field. When true, we will trim + * Struct field alias. When isTopLevel = true, we won't trim top-level + * Struct field alias. + * @param isTopLevel whether need to trim top-level alias of Struct field. this param is Review comment: > how about hiding this parameter from the caller side > > ``` > def resolveExpressionTopDown(e: Expression, q: LogicalPlan, trimAlias: Boolean = false) = { > if (e.resolved) return e > def innerResolve(expr: Expression, isTopLevel: Boolean) = expr match { > case ... > ... > } > innerResolve(e, isTopLevel = true) > } > ``` Hide this method is more safe, but we need to contain `if (e.resolved) return e` in `innerResolve` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480853032 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1325,24 +1325,43 @@ class Analyzer( * * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. + * + * @param e the expression need to be resolved. + * @param q the LogicalPlan who's children used to resolve expression's attribute. Review comment: > `whose children are used to ...` emmm..pool english skill..., changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480851500 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1325,24 +1325,43 @@ class Analyzer( * * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. + * + * @param e the expression need to be resolved. + * @param q the LogicalPlan use to resolve expression's attribute from. Review comment: > it's inaccurate. We resolve the given expression using `q.children` changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480838692 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1326,7 +1326,11 @@ class Analyzer( * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. */ -private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = { +private def resolveExpressionTopDown( +e: Expression, +q: LogicalPlan, +trimAlias: Boolean = false, +isTopLevel: Boolean = false): Expression = { Review comment: > `isTopLevel` should be true by default, otherwise the semantic is wrong when callers call `resolveExpressionTopDown` without specifying `isTopLevel`. Got your point, make this param controlled by this method make this method more safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480838768 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1425,11 +1433,48 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of - // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute - // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = a.groupingExpressions + .map(resolveExpressionTopDown(_, planForResolve, true)) Review comment: > nit `trimAlias = true` Done ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1425,11 +1433,48 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of - // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute - // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = a.groupingExpressions + .map(resolveExpressionTopDown(_, planForResolve, true)) + .map { +// trim Alias over top-level GetStructField +case Alias(s: GetStructField, _) => s +case other => other + } + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, planForResolve, true, true)) Review comment: > `isTopLevel` should be controlled by the method itself, not caller side. Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480838228 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1326,7 +1326,11 @@ class Analyzer( * Note : In this routine, the unresolved attributes are resolved from the input plan's * children attributes. */ -private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = { +private def resolveExpressionTopDown( Review comment: > let's add comments to explain what the new parameters do Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480565224 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1425,11 +1436,33 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan - // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of - // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute - // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = a.groupingExpressions + .map(resolveExpressionTopDown(_, planForResolve, true, false)) + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, planForResolve, true, true)) +.map(_.asInstanceOf[NamedExpression]) Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480231511 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1583,6 +1611,35 @@ class Analyzer( failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'") } } + +// For aggregateExpression we need NamedExpression, so for top level Struct field +// Alias, we won't trim it, for non top level Struct field Alias, we need to trim +// it since we have trimmed groupByExpressions. +def trimNonTopLevelStructFieldAlias(expr: Expression): NamedExpression = { + expr match { +case alias @ Alias(_: GetStructField, _) => alias +case e => trimStructFieldAlias(e).asInstanceOf[NamedExpression] + } +} + +// For struct field, it will be resolve as Alias(GetStructField, name), +// In Aggregate/GroupingSets this behavior will cause the same struct fields +// in aggExprs/groupExprs/selectedGroupByExprs be treated as different ones due to different +// ExprIds in Alias, and stops us finding the grouping expressions in aggExprs. Here we +// will use this method to remove Alias with different ExprId of GetStructField +// in groupByAlias/selectedGroupByExprs and aggregateExpressions +def trimStructFieldAlias(e: Expression): Expression = { + var fixed = ArrayBuffer[Alias]() + e.transformDown { +case a @ Alias(Alias(struct, _), name) if struct.isInstanceOf[GetStructField] => Review comment: > ok things get complicated. How about we add a new flag to resolveExpressionTopDown, so that we can strip alias only for resolved column? > and Add top-level Alias manually after calling resolveExpressionTopDown. Updated..really complicated... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480211330 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1583,6 +1611,35 @@ class Analyzer( failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'") } } + +// For aggregateExpression we need NamedExpression, so for top level Struct field +// Alias, we won't trim it, for non top level Struct field Alias, we need to trim +// it since we have trimmed groupByExpressions. Review comment: > the comment relies on `trimStructFieldAlias`, can we move this method after that? Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480210028 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,36 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { Review comment: > nit: move the original comment `// [SPARK-25942](https://issues.apache.org/jira/browse/SPARK-25942): ` above this line. Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480210406 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,36 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = Review comment: > nit: > > ``` > val resolvedGroupingExprs = a.groupingExpressions > .map... > .map... > ``` Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480183578 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,46 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = + a.groupingExpressions.map(resolveExpressionTopDown(_, planForResolve)) +.map(trimStructFieldAlias) + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, planForResolve)) + .map { Review comment: > and clearly document it. Done, and please review the comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r48018 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,46 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = + a.groupingExpressions.map(resolveExpressionTopDown(_, planForResolve)) +.map(trimStructFieldAlias) + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, planForResolve)) + .map { Review comment: > ok, let's create a method with name `trimNonTopLevelStructFieldAlias ` Uodated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480155321 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1428,8 +1428,46 @@ class Analyzer( // SPARK-25942: Resolves aggregate expressions with `AppendColumns`'s children, instead of // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute // names leading to ambiguous references exception. - case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => -a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case a: Aggregate => +val planForResolve = a.child match { + case appendColumns: AppendColumns => appendColumns + case _ => a +} + +val resolvedGroupingExprs = + a.groupingExpressions.map(resolveExpressionTopDown(_, planForResolve)) +.map(trimStructFieldAlias) + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, planForResolve)) + .map { Review comment: > Can we just stop here and don't change the `resolvedAggExprs`? No, since when we have complex expression in `GROU BY` and aggregateExpression. if we don't resolve this will make aggregateExpression can't match with groupByExpression's expr. ``` checkAnswer( sql( """ |SELECT a, get_json_object(c.json_string, '$.i'), SUM(b) |FROM t |GROUP BY a, get_json_object(c.json_string, '$.i') |WITH CUBE |""".stripMargin), Row("A", "1", 3) :: Row("A", "2", 2) :: Row("A", null, 5) :: Row("B", "1", 1) :: Row("B", null, 1) :: Row("C", "1", 3) :: Row("C", null, 3) :: Row(null, "1", 7) :: Row(null, "2", 2) :: Row(null, null, 9) :: Nil) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480131732 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,42 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case a: Aggregate => Review comment: > there is a `case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns)` above, please merge that into this case. Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480129864 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -3383,6 +3419,25 @@ object CleanupAliases extends Rule[LogicalPlan] { } } + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause the same struct fields + // in aggExprs/groupExprs/selectedGroupByExprs be treated as different ones due to different + // ExprIds in Alias, and stops us finding the grouping expressions in aggExprs. Here we + // will use this method to remove Alias with different ExprId of GetStructField + // in groupByAlias/selectedGroupByExprs and aggregateExpressions + def trimStructFieldAlias(e: Expression): Expression = { Review comment: > It's only called in `ResolveReferences`, let's define it there. Ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480121972 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,42 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case a: Aggregate => +val resolvedGroupingExprs = + a.groupingExpressions.map(resolveExpressionTopDown(_, a)) +.map(CleanupAliases.trimStructFieldAlias) + +val resolvedAggExprs = a.aggregateExpressions + .map(resolveExpressionTopDown(_, a)) + .map { +// Here we want to trim StructField Alias in complex expression +// Only Alias(GetStructField, name) can't be trimmed since we need NamedExpression. +case alias@Alias(_: GetStructField, _) => alias +case e => CleanupAliases.trimStructFieldAlias(e).asInstanceOf[NamedExpression] Review comment: > why do we need to do it? As i have said in comment. If not get rid of that case will got error ``` SPARK-31670: Resolve Struct Field in Grouping Aggregate with same ExprId *** FAILED *** (185 milliseconds) [info] java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression [info] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r480120727 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -570,18 +570,44 @@ class Analyzer( s"Grouping sets size cannot be greater than ${GroupingID.dataType.defaultSize * 8}") } + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause the same struct fields Review comment: cc @cloud-fan Updated. since it's special, we can't just use CleanupAlias.trimAlias() Add a new method to fit for GetStructField This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479962914 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -570,18 +570,44 @@ class Analyzer( s"Grouping sets size cannot be greater than ${GroupingID.dataType.defaultSize * 8}") } + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause the same struct fields Review comment: > It's simpler to fix the problem earlier. Let's try it. Ok, update later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479954858 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -570,18 +570,44 @@ class Analyzer( s"Grouping sets size cannot be greater than ${GroupingID.dataType.defaultSize * 8}") } + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause the same struct fields Review comment: Similar like https://github.com/apache/spark/pull/28490#discussion_r470555710. Need to try if it can solve all problem use `CleanupAlias.trimAliases()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479944701 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,98 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Resolve Struct Field in Grouping Aggregate with same ExprId") { +withTempView("t") { + sql( +""" + |CREATE TEMPORARY VIEW t(a, b, c) AS + |SELECT * FROM VALUES + |('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')), + |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')), + |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')), + |('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')), + |('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}')) +""".stripMargin) + + checkAnswer( +sql( + """ +|SELECT a, c.json_string, SUM(b) +|FROM t +|GROUP BY a, c.json_string +|WITH CUBE Review comment: > does normal GROUP BY have this bug? Add a test case for normal GROUP BY This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479942446 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,98 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Resolve Struct Field in Grouping Aggregate with same ExprId") { +withTempView("t") { + sql( +""" + |CREATE TEMPORARY VIEW t(a, b, c) AS + |SELECT * FROM VALUES + |('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')), + |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')), + |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')), + |('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')), + |('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}')) +""".stripMargin) + + checkAnswer( +sql( + """ +|SELECT a, c.json_string, SUM(b) +|FROM t +|GROUP BY a, c.json_string +|WITH CUBE Review comment: > does normal GROUP BY have this bug? No, this happened when construct group Analytics, and changed code only fix this too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479848494 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: also ping @maropu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r479744826 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: cc @cloud-fan tried to remove alias of struct field in latest pr. Could you take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r471977082 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: > Sorry I didn't make myself clear. Can we remove aliases only in `Aggregate.groupingExpressions`? Ok, I will try this way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r471414905 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: > But `Aggregate/GroupingSets` does not need `NamedExpression` for grouping expressions, right? But aggregate expressions need NamedExpression. So you mean remove Alias and then add Alias later when construct aggExpressions? I am not sure if we can do like this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r470660102 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId + val structFieldMap = mutable.Map[String, Alias]() + resolved.transformExpressionsDown { Review comment: > another concern: for `SELECT a.b, a.b FROM t GROUP BY a.b`, will we end up with an `Aggregate` operator whose output has duplicated ExprId? It's probably not an issue as the two output columns always have the same value. struct.sql in UT show as `t`.`c`.`json_string`, [table].[column].[field], so seems won't have case that same `struct.sql` mean different column? New way in https://github.com/apache/spark/pull/28490#discussion_r470555710 only handle current node to avoid same struct.sql show different column because of alias in different sql level. emm, for two output `a.b`, it's hard to say which one is the one in cube. so in this case same output with duplicated exprId seems ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r470660102 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId + val structFieldMap = mutable.Map[String, Alias]() + resolved.transformExpressionsDown { Review comment: > another concern: for `SELECT a.b, a.b FROM t GROUP BY a.b`, will we end up with an `Aggregate` operator whose output has duplicated ExprId? It's probably not an issue as the two output columns always have the same value. struct.sql in UT show as `t`.`c`.`json_string`, [table].[column].[field], so seems won't have case that same `struct.sql` mean different column? New way in https://github.com/apache/spark/pull/28490#discussion_r470555710 only handle current node to avoid same struct.sql show different column because of alias in different sql level. emm, for two output `a.b`, it's hard to say which one is the one in cube? right?, so in this case same output with duplicated exprId seems ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r470555710 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId + val structFieldMap = mutable.Map[String, Alias]() + resolved.transformExpressionsDown { +case a @ Alias(struct: GetStructField, _) => + if (structFieldMap.contains(struct.sql)) { Review comment: > is it safe to use sql string as key? E how about ``` case agg @ (_: Aggregate | _: GroupingSets) => def resolveDuplicatedStructField( expr: Expression, filedMap: mutable.Map[String, Alias]): Expression = { expr.transform { case a @ Alias(struct: GetStructField, _) => val exprId = filedMap.getOrElse(struct.sql, a).exprId Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata) } } val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) val resolvedExpressions = resolved match { case aggregate: Aggregate => aggregate.aggregateExpressions ++ aggregate.groupingExpressions case groupingSets: GroupingSets => groupingSets.selectedGroupByExprs.flatMap(_.toSeq) ++ groupingSets.groupByExprs ++ groupingSets.aggregations case _ => Nil } val hasStructField = resolvedExpressions.exists { _.collectFirst { case gsf: GetStructField => gsf }.isDefined } if (hasStructField) { // For struct field, it will be resolve as Alias(GetStructField, name), // In Aggregate/GroupingSets this behavior will cause the same struct fields // in aggExprs/groupExprs/selectedGroupByExprs be treated as different ones // due to different ExprIds in Alias, and stops us finding the grouping expressions // in aggExprs. Here we resolve duplicated struct field here with same ExprId val structFieldMap = mutable.Map[String, Alias]() resolvedExpressions.flatMap(_.collect { case a @ Alias(struct: GetStructField, _) => struct.sql -> a }).foreach { case (name, alias) => if (!structFieldMap.contains(name)) { structFieldMap += (name -> alias) } } resolved match { case aggregate: Aggregate => aggregate.copy( groupingExpressions = aggregate.groupingExpressions .map(resolveDuplicatedStructField(_, structFieldMap)), aggregateExpressions = aggregate.aggregateExpressions .map(resolveDuplicatedStructField(_, structFieldMap) .asInstanceOf[NamedExpression])) case groupingSets: GroupingSets => groupingSets.copy( selectedGroupByExprs = groupingSets.selectedGroupByExprs .map(_.map(resolveDuplicatedStructField(_, structFieldMap))), groupByExprs = groupingSets.groupByExprs .map(resolveDuplicatedStructField(_, structFieldMap)), aggregations = groupingSets.aggregations .map(resolveDuplicatedStructField(_, structFieldMap) .asInstanceOf[NamedExpression]) ) } } else { resolved } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r470509589 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: > ah i see, then can we remove aliases in grouping expressions here? Tried, No, since a lot api need param as `NamedExpression` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r470362613 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,6 +1479,33 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case agg @ (_: Aggregate | _: GroupingSets) => +val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg)) +val hasStructField = resolved.expressions.exists { + _.collectFirst { case gsf: GetStructField => gsf }.isDefined +} +if (hasStructField) { + // For struct field, it will be resolve as Alias(GetStructField, name), + // In Aggregate/GroupingSets this behavior will cause same struct field + // in aggExprs/groupExprs/selectedGroupByExprs will be resolved divided + // with different ExprId of Alias and replace failed when construct + // Aggregate in ResolveGroupingAnalytics, so we resolve duplicated struct + // field here with same ExprId Review comment: > I don't get it. `CleanupAliases` will remove aliases from the grouping expressions. Why do we hit the bug? This error happen when `ResolveGroupingAnalytics` construct Grouping Analytics Aggregation, When expand expression, match error because of different ExprID This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436360282 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1481,7 +1486,35 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") Review comment: > w/ some code cleanup; Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436357558 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1259,6 +1259,11 @@ class Analyzer( attr.withExprId(exprId) } +private def dedupStructField(attr: Alias, structFieldMap: Map[String, Attribute]) = { Review comment: > Not used now? Oh, yea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436329103 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] +resolved.transformExpressions { + case a @ Alias(struct: GetStructField, _) => +if (structFieldMap.contains(struct.sql)) { + val exprId = structFieldMap.getOrElse(struct.sql, a).exprId + Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata) +} else { + structFieldMap.put(struct.sql, a) + a +} + case e => e +} + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) } +def needResolveStructField(plan: LogicalPlan): Boolean = { + plan match { +case UnresolvedHaving(havingCondition, a: Aggregate) + if containSameStructFields(a.groupingExpressions.flatMap(_.references), +a.aggregateExpressions.flatMap(_.references), +Some(havingCondition.references.toSeq)) => true +case Aggregate(groupingExpressions, aggregateExpressions, _) + if containSameStructFields(groupingExpressions.flatMap(_.references), +aggregateExpressions.flatMap(_.references)) => true +case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations) + if containSameStructFields(groupByExprs.flatMap(_.references), +aggregations.flatMap(_.references), +Some(selectedGroupByExprs.flatMap(_.flatMap(_.references => true +case _ => false + } +} + +def containSameStructFields( Review comment: > I think its better to avoid comparing unresolved attributes could we resolve them then detect the mismatched exprIds? How about current? not check, just reoslve. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436327006 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] +resolved.transformExpressions { + case a @ Alias(struct: GetStructField, _) => +if (structFieldMap.contains(struct.sql)) { + val exprId = structFieldMap.getOrElse(struct.sql, a).exprId + Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata) +} else { + structFieldMap.put(struct.sql, a) + a +} + case e => e +} + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) } +def needResolveStructField(plan: LogicalPlan): Boolean = { + plan match { +case UnresolvedHaving(havingCondition, a: Aggregate) + if containSameStructFields(a.groupingExpressions.flatMap(_.references), +a.aggregateExpressions.flatMap(_.references), +Some(havingCondition.references.toSeq)) => true +case Aggregate(groupingExpressions, aggregateExpressions, _) + if containSameStructFields(groupingExpressions.flatMap(_.references), +aggregateExpressions.flatMap(_.references)) => true +case GroupingSets(selectedGroupByExprs, groupByExprs, _, aggregations) + if containSameStructFields(groupByExprs.flatMap(_.references), +aggregations.flatMap(_.references), +Some(selectedGroupByExprs.flatMap(_.flatMap(_.references => true +case _ => false + } +} + +def containSameStructFields( +grpExprs: Seq[Attribute], Review comment: > nit: `grpExprs` -> `groupExprs` for consistency. Done ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1479,11 +1486,70 @@ class Analyzer( // Skip the having clause here, this will be handled in ResolveAggregateFunctions. case h: UnresolvedHaving => h + case p: LogicalPlan if needResolveStructField(p) => +logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") +val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) +val structFieldMap = new mutable.HashMap[String, Alias] Review comment: > `mutable.HashMap` -> `mutable.Map` Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436327053 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t") { + sql( +"""CREATE TABLE t( + |a STRING, + |b INT, + |c ARRAY>, + |d ARRAY>, + |e ARRAY>) + |USING ORC""".stripMargin) Review comment: > See: [#28490 (comment)](https://github.com/apache/spark/pull/28490#discussion_r434371366) Test case change to view and with actual data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436324728 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t") { + sql( +"""CREATE TABLE t( + |a STRING, + |b INT, + |c ARRAY>, + |d ARRAY>, + |e ARRAY>) + |USING ORC""".stripMargin) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) x AS each +|GROUP BY a, each.json_string +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, get_json_object(each.json_string, '$.i'), SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) X AS each +|GROUP BY a, get_json_object(each.json_string, '$.i') +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string AS json_string, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) x AS each +|GROUP BY a, each.json_string +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string as js, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) X AS each +|GROUP BY a, each.json_string +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string as js, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) X AS each +|GROUP BY a, each.json_string +|WITH ROLLUP +|""".stripMargin), Nil) + + sql( +""" + |SELECT a, each.json_string, SUM(b) + |FROM t + |LATERAL VIEW EXPLODE(c) X AS each + |GROUP BY a, each.json_string + |GROUPING sets((a),(a, each.json_string)) + |""".stripMargin).explain(true) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) X AS each +|GROUP BY a, each.json_string +|GROUPING sets((a),(a, each.json_string)) +|""".stripMargin), Nil) Review comment: > Could you add tests having queries with `HAVING` clauses? Seems I make a mistake, having won't have duplicate field since it won't have grouping keys in having condition This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436324734 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { +withTable("t") { + sql( +"""CREATE TABLE t( + |a STRING, + |b INT, + |c ARRAY>, + |d ARRAY>, + |e ARRAY>) + |USING ORC""".stripMargin) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) x AS each +|GROUP BY a, each.json_string +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, get_json_object(each.json_string, '$.i'), SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) X AS each +|GROUP BY a, get_json_object(each.json_string, '$.i') +|WITH CUBE +|""".stripMargin), Nil) + + checkAnswer( +sql( + """ +|SELECT a, each.json_string AS json_string, SUM(b) +|FROM t +|LATERAL VIEW EXPLODE(c) x AS each Review comment: > btw, we must need `lateral view` to reproduce this issue? I mean, this issue cannot happen without `lateral view`? No, I changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #28490: [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId
AngersZh commented on a change in pull request #28490: URL: https://github.com/apache/spark/pull/28490#discussion_r436324690 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkIfSeedExistsInExplain(df2) } + test("SPARK-31670: Struct Field in groupByExpr with CUBE") { Review comment: > Could you please make the title more correct? I think we don't need the word `with CUBE`. It's ok for current PR title? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org