maropu commented on a change in pull request #28898:
URL: https://github.com/apache/spark/pull/28898#discussion_r447488633



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -153,6 +169,7 @@ object NestedColumnAliasing {
     val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
       .filter(!_.references.subsetOf(exclusiveAttrSet))
       .groupBy(_.references.head)
+      .toList

Review comment:
       We need this?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -39,6 +39,20 @@ object NestedColumnAliasing {
           NestedColumnAliasing.replaceToAliases(plan, nestedFieldToAlias, 
attrToAliases)
       }
 
+    /**
+     * This pattern is needed to support [[Filter]] plan cases like
+     * [[Project]]->[[Filter]]->listed plan in `canProjectPushThrough` (e.g., 
[[Window]]).
+     * The reason why we don't simply add [[Filter]] in 
`canProjectPushThrough` is that
+     * the optimizer can hit an infinite loop during the 
[[PushDownPredicates]] rule.
+     */
+    case Project(projectList, Filter(condition, child))
+      if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>

Review comment:
       nit: indent

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -178,11 +195,16 @@ object NestedColumnAliasing {
             nestedFieldToAlias
               .map { case (nestedField, _) => 
totalFieldNum(nestedField.dataType) }
               .sum < totalFieldNum(attr.dataType)) {
-          Some(attr.exprId -> nestedFieldToAlias)
+          Some((attr.exprId, nestedFieldToAlias))

Review comment:
       Why did change this?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -178,11 +195,16 @@ object NestedColumnAliasing {
             nestedFieldToAlias
               .map { case (nestedField, _) => 
totalFieldNum(nestedField.dataType) }
               .sum < totalFieldNum(attr.dataType)) {
-          Some(attr.exprId -> nestedFieldToAlias)
+          Some((attr.exprId, nestedFieldToAlias))
         } else {
           None
         }
       }
+      .groupBy(_._1) // To fix same ExprId mapped to different attribute 
instance
+      .map {
+        case (exprId: ExprId, expressions: List[(ExprId, Seq[(ExtractValue, 
Alias)])]) =>
+          exprId -> expressions.flatMap(_._2)
+      }

Review comment:
       Plz avoid the long chaining and see: 
https://github.com/databricks/scala-style-guide#chaining

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -178,11 +195,16 @@ object NestedColumnAliasing {
             nestedFieldToAlias
               .map { case (nestedField, _) => 
totalFieldNum(nestedField.dataType) }
               .sum < totalFieldNum(attr.dataType)) {
-          Some(attr.exprId -> nestedFieldToAlias)
+          Some((attr.exprId, nestedFieldToAlias))
         } else {
           None
         }
       }
+      .groupBy(_._1) // To fix same ExprId mapped to different attribute 
instance

Review comment:
       Is this an existing issue? Could you show us a query to reproduce this?

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
##########
@@ -493,6 +491,87 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     comparePlans(optimized3, expected3)
   }
 
+  test("Nested field pruning for Window") {
+    val spec = windowSpec($"address" :: Nil, $"id".asc :: Nil, 
UnspecifiedFrame)
+    val winExpr = windowExpr(RowNumber().toAggregateExpression(), spec)
+
+    val query1 = contact
+      .select($"name.first", winExpr.as('window))
+      .analyze
+    val optimized1 = Optimize.execute(query1)
+    val expected1 = contact
+      .select($"name.first", $"address", $"id")
+      .window(Seq(winExpr.as("window")), Seq($"address"), Seq($"id".asc))
+      .select($"first", $"window")
+      .analyze
+    comparePlans(optimized1, expected1)
+
+    val query2 = contact
+      .select($"name.first", winExpr.as('window))
+      .orderBy($"name.last".asc)
+      .analyze
+    val optimized2 = Optimize.execute(query2)
+    val aliases2 = collectGeneratedAliases(optimized2)
+    val expected2 = contact
+      .select($"name.first", $"address", $"id", $"name.last".as(aliases2(1)))
+      .window(Seq(winExpr.as("window")), Seq($"address"), Seq($"id".asc))
+      .select($"first", $"window", $"${aliases2(1)}".as(aliases2(0)))
+      .orderBy($"${aliases2(0)}".asc)
+      .select($"first", $"window")
+      .analyze
+    comparePlans(optimized2, expected2)
+  }
+
+  test("Nested field pruning for Filter") {
+    val spec = windowSpec($"address" :: Nil, $"id".asc :: Nil, 
UnspecifiedFrame)
+    val winExpr = windowExpr(RowNumber().toAggregateExpression(), spec)
+    val query = contact.select($"name.first", winExpr.as('window))
+      .where($"window" === 1 && $"name.first" === "a")

Review comment:
       Could you add some tests for the other combinations, e.g., Filter + 
Limit, Filter + Sort, ...?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to