tanelk commented on a change in pull request #29950:
URL: https://github.com/apache/spark/pull/29950#discussion_r501976989
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as
'c)).analyze
comparePlans(optimized, expected)
}
+
+ test("SPARK-32945: avoid collapsing projects if reaching max allowed common
exprs") {
+ val options = Map.empty[String, String]
+ val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+ Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+ withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key ->
maxCommonExprs) {
+ // If we collapse two Projects, `JsonToStructs` will be repeated three
times.
+ val relation = LocalRelation('json.string)
+ val query = relation.select(
+ JsonToStructs(schema, options, 'json).as("struct"))
+ .select(
+ GetStructField('struct, 0).as("a"),
+ GetStructField('struct, 1).as("b"),
+ GetStructField('struct, 2).as("c")).analyze
Review comment:
When using the dataset API, then it would be very common to chain
`withColumn` calls:
```
dataset
.withColumn("json", ...)
.withColumn("a", col("json").getField("a"))
.withColumn("b", col("json").getField("b"))
.withColumn("c", col("json").getField("c"))
```
In that case the query should look more like this:
```
val query = relation
.select('json, JsonToStructs(schema, options, 'json).as("struct"))
.select('json, 'struct, GetStructField('struct, 0).as("a"))
.select('json, 'struct, 'a, GetStructField('struct, 1).as("b"))
.select('json, 'struct, 'a, 'b, GetStructField('struct, 2).as("c"))
.analyze
```
The `CollapseProject` rule uses `transformUp`. It seems that in that case we
do not get the expected results from this optimization.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as
'c)).analyze
comparePlans(optimized, expected)
}
+
+ test("SPARK-32945: avoid collapsing projects if reaching max allowed common
exprs") {
+ val options = Map.empty[String, String]
+ val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+ Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+ withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key ->
maxCommonExprs) {
+ // If we collapse two Projects, `JsonToStructs` will be repeated three
times.
+ val relation = LocalRelation('json.string)
+ val query = relation.select(
+ JsonToStructs(schema, options, 'json).as("struct"))
+ .select(
+ GetStructField('struct, 0).as("a"),
+ GetStructField('struct, 1).as("b"),
+ GetStructField('struct, 2).as("c")).analyze
Review comment:
If there is a choin of projects: `P1(P2(P3(P4(...))))`, then using
`transformDown` will firstly merge `P1` and `P2` into `P12` and then it will go
to its child `P3` and merge it with `P4` into `P34`. Only on the second
iteration it will merge all 4 of these.
In this case we want to merge `P123` and then see, that we can't merge with
`P4` because we would exceed `maxCommonExprsInCollapseProject`.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as
'c)).analyze
comparePlans(optimized, expected)
}
+
+ test("SPARK-32945: avoid collapsing projects if reaching max allowed common
exprs") {
+ val options = Map.empty[String, String]
+ val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+ Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+ withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key ->
maxCommonExprs) {
+ // If we collapse two Projects, `JsonToStructs` will be repeated three
times.
+ val relation = LocalRelation('json.string)
+ val query = relation.select(
+ JsonToStructs(schema, options, 'json).as("struct"))
+ .select(
+ GetStructField('struct, 0).as("a"),
+ GetStructField('struct, 1).as("b"),
+ GetStructField('struct, 2).as("c")).analyze
Review comment:
I think, that correct way would be using `transformDown` in a similar
manner to `recursiveRemoveSort` in #21072.
So basically when you hit the first `Project`, then you collect all
consecutive `Projects` until you hit the `maxCommonExprsInCollapseProject`
limit and merge them.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
##########
@@ -170,4 +171,34 @@ class CollapseProjectSuite extends PlanTest {
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as
'c)).analyze
comparePlans(optimized, expected)
}
+
+ test("SPARK-32945: avoid collapsing projects if reaching max allowed common
exprs") {
+ val options = Map.empty[String, String]
+ val schema = StructType.fromDDL("a int, b int, c string, d long")
+
+ Seq("1", "2", "3", "4").foreach { maxCommonExprs =>
+ withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key ->
maxCommonExprs) {
+ // If we collapse two Projects, `JsonToStructs` will be repeated three
times.
+ val relation = LocalRelation('json.string)
+ val query = relation.select(
+ JsonToStructs(schema, options, 'json).as("struct"))
+ .select(
+ GetStructField('struct, 0).as("a"),
+ GetStructField('struct, 1).as("b"),
+ GetStructField('struct, 2).as("c")).analyze
Review comment:
If there is a chain of projects: `P1(P2(P3(P4(...))))`, then using
`transformDown` will firstly merge `P1` and `P2` into `P12` and then it will go
to its child `P3` and merge it with `P4` into `P34`. Only on the second
iteration it will merge all 4 of these.
In this case we want to merge `P123` and then see, that we can't merge with
`P4` because we would exceed `maxCommonExprsInCollapseProject`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]