maropu commented on a change in pull request #29950:
URL: https://github.com/apache/spark/pull/29950#discussion_r508931332
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -124,14 +126,34 @@ object ScanOperation extends OperationHelper with
PredicateHelper {
}.exists(!_.deterministic))
}
+ def moreThanMaxAllowedCommonOutput(
+ expr: Seq[NamedExpression],
+ aliases: AttributeMap[Expression]): Boolean = {
+ val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+ expr.foreach(_.collect {
+ case a: Attribute if aliases.contains(a) => exprMap.update(a,
exprMap.getOrElse(a, 0) + 1)
+ })
+
+ val commonOutputs = if (exprMap.size > 0) {
+ exprMap.maxBy(_._2)._2
+ } else {
+ 0
+ }
+
+ commonOutputs > SQLConf.get.maxCommonExprsInCollapseProject
+ }
+
private def collectProjectsAndFilters(plan: LogicalPlan): ScanReturnType = {
plan match {
case Project(fields, child) =>
collectProjectsAndFilters(child) match {
case Some((_, filters, other, aliases)) =>
// Follow CollapseProject and only keep going if the collected
Projects
- // do not have common non-deterministic expressions.
- if (!hasCommonNonDeterministic(fields, aliases)) {
+ // do not have common non-deterministic expressions, and do not
have more than
+ // maximum allowed common outputs.
+ if (!hasCommonNonDeterministic(fields, aliases) &&
+ !moreThanMaxAllowedCommonOutput(fields, aliases)) {
Review comment:
`PhysicalOperation` does not have the same issue?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -760,12 +757,43 @@ object CollapseProject extends Rule[LogicalPlan] {
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1,
p2.projectList)))
}
+ private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match {
+ case p1 @ Project(_, p2: Project) =>
+ val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
+
+ if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
+ getLargestNumOfCommonOutput(p1.projectList, p2.projectList) >
maxCommonExprs) {
+ p1
+ } else {
+ collapseProjects(
+ p2.copy(projectList = buildCleanedProjectList(p1.projectList,
p2.projectList)))
+ }
+ case _ => plan
+ }
+
private def collectAliases(projectList: Seq[NamedExpression]):
AttributeMap[Alias] = {
AttributeMap(projectList.collect {
case a: Alias => a.toAttribute -> a
})
}
+ // Counts for the largest times common outputs from lower operator are used
in upper operators.
+ private def getLargestNumOfCommonOutput(
+ upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Int = {
+ val aliases = collectAliases(lower)
+ val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+ upper.foreach(_.collect {
+ case a: Attribute if aliases.contains(a) => exprMap.update(a,
exprMap.getOrElse(a, 0) + 1)
+ })
+
+ if (exprMap.size > 0) {
Review comment:
`nonEmpty` instead?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+ buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
Review comment:
If we set this value to 1, all the existing tests can pass?
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##########
@@ -2567,6 +2568,51 @@ class DataFrameSuite extends QueryTest
val df = l.join(r, $"col2" === $"col4", "LeftOuter")
checkAnswer(df, Row("2", "2"))
}
+
+ test("SPARK-32945: Avoid collapsing projects if reaching max allowed common
exprs") {
+ val options = Map.empty[String, String]
+ val schema = StructType.fromDDL("a int, b int, c long, d string")
+
+ withTable("test_table") {
+ val jsonDF = Seq("""{"a":1, "b":2, "c": 123, "d":
"test"}""").toDF("json")
Review comment:
super nit: `DF` -> `Df`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+ buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+ .doc("An integer number indicates the maximum allowed number of common
input expression " +
+ "from lower Project when being collapsed into upper Project by
optimizer rule " +
+ "`CollapseProject`. Normally `CollapseProject` will collapse adjacent
Project " +
Review comment:
(Just a comment) If we set `CollapseProject` in
`spark.sql.optimizer.excludedRules`, it seems like Spark still respects this
value in `ScanOperation`? That behaviour might be okay, but it looks a bit
weird to me.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+ buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+ .doc("An integer number indicates the maximum allowed number of common
input expression " +
+ "from lower Project when being collapsed into upper Project by
optimizer rule " +
+ "`CollapseProject`. Normally `CollapseProject` will collapse adjacent
Project " +
+ "and merge expressions. But in some edge cases, expensive expressions
might be " +
+ "duplicated many times in merged Project by this optimization. This
config sets " +
+ "a maximum number. Once an expression is duplicated more than this
number " +
+ "if merging two Project, Spark SQL will skip the merging. Note that
normally " +
+ "in whole-stage codegen Project operator will de-duplicate expressions
internally, " +
+ "but in edge cases Spark cannot do whole-stage codegen and fallback to
interpreted " +
+ "mode. In such cases, users can use this config to avoid duplicate
expressions")
+ .version("3.1.0")
+ .intConf
+ .checkValue(m => m > 0, "maxCommonExprsInCollapseProject must be larger
than zero.")
Review comment:
nit: `m => m > 0` -> `_ > 0`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -760,12 +757,43 @@ object CollapseProject extends Rule[LogicalPlan] {
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1,
p2.projectList)))
}
+ private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match {
+ case p1 @ Project(_, p2: Project) =>
+ val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
+
+ if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
+ getLargestNumOfCommonOutput(p1.projectList, p2.projectList) >
maxCommonExprs) {
Review comment:
How about following the `ScanOperation` condition like this?
```
case p1 @ Project(_, p2: Project) =>
val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
moreThanMaxAllowedCommonOutput(p1.projectList, p2.projectList)) {
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -124,14 +126,34 @@ object ScanOperation extends OperationHelper with
PredicateHelper {
}.exists(!_.deterministic))
}
+ def moreThanMaxAllowedCommonOutput(
+ expr: Seq[NamedExpression],
+ aliases: AttributeMap[Expression]): Boolean = {
+ val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+ expr.foreach(_.collect {
+ case a: Attribute if aliases.contains(a) => exprMap.update(a,
exprMap.getOrElse(a, 0) + 1)
+ })
+
+ val commonOutputs = if (exprMap.size > 0) {
Review comment:
```
if (exprMap.nonEmpty) {
exprMap.maxBy(_._2)._2 > SQLConf.get.maxCommonExprsInCollapseProject
} else {
false
}
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+ buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+ .doc("An integer number indicates the maximum allowed number of common
input expression " +
+ "from lower Project when being collapsed into upper Project by
optimizer rule " +
+ "`CollapseProject`. Normally `CollapseProject` will collapse adjacent
Project " +
+ "and merge expressions. But in some edge cases, expensive expressions
might be " +
+ "duplicated many times in merged Project by this optimization. This
config sets " +
+ "a maximum number. Once an expression is duplicated more than this
number " +
+ "if merging two Project, Spark SQL will skip the merging. Note that
normally " +
+ "in whole-stage codegen Project operator will de-duplicate expressions
internally, " +
+ "but in edge cases Spark cannot do whole-stage codegen and fallback to
interpreted " +
+ "mode. In such cases, users can use this config to avoid duplicate
expressions")
+ .version("3.1.0")
+ .intConf
+ .checkValue(m => m > 0, "maxCommonExprsInCollapseProject must be larger
than zero.")
Review comment:
nit: `maxCommonExprsInCollapseProject` -> `The value of
maxCommonExprsInCollapseProject `
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]