tanelk commented on a change in pull request #29950:
URL: https://github.com/apache/spark/pull/29950#discussion_r501976989



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
     val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 
'c)).analyze
     comparePlans(optimized, expected)
   }
+
+  test("SPARK-32945: avoid collapsing projects if reaching max allowed common 
exprs") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+    Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+      withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key -> 
maxCommonExprs) {
+        // If we collapse two Projects, `JsonToStructs` will be repeated three 
times.
+        val relation = LocalRelation('json.string)
+        val query = relation.select(
+          JsonToStructs(schema, options, 'json).as("struct"))
+          .select(
+            GetStructField('struct, 0).as("a"),
+            GetStructField('struct, 1).as("b"),
+            GetStructField('struct, 2).as("c")).analyze

Review comment:
       When using the dataset API, then it would be very common to chain 
`withColumn` calls:
   ```
   dataset
       .withColumn("json", ...)
       .withColumn("a", col("json").getField("a"))
       .withColumn("b", col("json").getField("b"))
       .withColumn("c", col("json").getField("c"))
   ```
   
   In that case the query should look more like this:
   ```
           val query = relation
             .select('json, JsonToStructs(schema, options, 'json).as("struct"))
             .select('json, 'struct, GetStructField('struct, 0).as("a"))
             .select('json, 'struct, 'a, GetStructField('struct, 1).as("b"))
             .select('json, 'struct, 'a, 'b, GetStructField('struct, 2).as("c"))
             .analyze
   ```
   
   The `CollapseProject` rule uses `transformUp`. It seems that in that case we 
do not get the expected results from this optimization.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
     val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 
'c)).analyze
     comparePlans(optimized, expected)
   }
+
+  test("SPARK-32945: avoid collapsing projects if reaching max allowed common 
exprs") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+    Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+      withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key -> 
maxCommonExprs) {
+        // If we collapse two Projects, `JsonToStructs` will be repeated three 
times.
+        val relation = LocalRelation('json.string)
+        val query = relation.select(
+          JsonToStructs(schema, options, 'json).as("struct"))
+          .select(
+            GetStructField('struct, 0).as("a"),
+            GetStructField('struct, 1).as("b"),
+            GetStructField('struct, 2).as("c")).analyze

Review comment:
       If there is a choin of projects: `P1(P2(P3(P4(...))))`, then using 
`transformDown` will firstly merge `P1` and `P2` into `P12` and then it will go 
to its child `P3` and merge it with `P4` into `P34`. Only on the second 
iteration it will merge all 4 of these.
   
   In this case we want to merge `P123` and then see, that we can't merge with 
`P4` because we would exceed `maxCommonExprsInCollapseProject`.
   
   

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
     val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 
'c)).analyze
     comparePlans(optimized, expected)
   }
+
+  test("SPARK-32945: avoid collapsing projects if reaching max allowed common 
exprs") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+    Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+      withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key -> 
maxCommonExprs) {
+        // If we collapse two Projects, `JsonToStructs` will be repeated three 
times.
+        val relation = LocalRelation('json.string)
+        val query = relation.select(
+          JsonToStructs(schema, options, 'json).as("struct"))
+          .select(
+            GetStructField('struct, 0).as("a"),
+            GetStructField('struct, 1).as("b"),
+            GetStructField('struct, 2).as("c")).analyze

Review comment:
       I think, that correct way would be using `transformDown` in a similar 
manner to `recursiveRemoveSort` in #21072. 
   So basically when you hit the first `Project`, then you collect all 
consecutive `Projects` until you hit the `maxCommonExprsInCollapseProject` 
limit and merge them.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
     val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 
'c)).analyze
     comparePlans(optimized, expected)
   }
+
+  test("SPARK-32945: avoid collapsing projects if reaching max allowed common 
exprs") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+    Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+      withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key -> 
maxCommonExprs) {
+        // If we collapse two Projects, `JsonToStructs` will be repeated three 
times.
+        val relation = LocalRelation('json.string)
+        val query = relation.select(
+          JsonToStructs(schema, options, 'json).as("struct"))
+          .select(
+            GetStructField('struct, 0).as("a"),
+            GetStructField('struct, 1).as("b"),
+            GetStructField('struct, 2).as("c")).analyze

Review comment:
       If there is a chain of projects: `P1(P2(P3(P4(...))))`, then using 
`transformDown` will firstly merge `P1` and `P2` into `P12` and then it will go 
to its child `P3` and merge it with `P4` into `P34`. Only on the second 
iteration it will merge all 4 of these.
   
   In this case we want to merge `P123` and then see, that we can't merge with 
`P4` because we would exceed `maxCommonExprsInCollapseProject`.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to