maropu commented on a change in pull request #29950:
URL: https://github.com/apache/spark/pull/29950#discussion_r508931332



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -124,14 +126,34 @@ object ScanOperation extends OperationHelper with 
PredicateHelper {
     }.exists(!_.deterministic))
   }
 
+  def moreThanMaxAllowedCommonOutput(
+      expr: Seq[NamedExpression],
+      aliases: AttributeMap[Expression]): Boolean = {
+    val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+    expr.foreach(_.collect {
+      case a: Attribute if aliases.contains(a) => exprMap.update(a, 
exprMap.getOrElse(a, 0) + 1)
+    })
+
+    val commonOutputs = if (exprMap.size > 0) {
+      exprMap.maxBy(_._2)._2
+    } else {
+      0
+    }
+
+    commonOutputs > SQLConf.get.maxCommonExprsInCollapseProject
+  }
+
   private def collectProjectsAndFilters(plan: LogicalPlan): ScanReturnType = {
     plan match {
       case Project(fields, child) =>
         collectProjectsAndFilters(child) match {
           case Some((_, filters, other, aliases)) =>
             // Follow CollapseProject and only keep going if the collected 
Projects
-            // do not have common non-deterministic expressions.
-            if (!hasCommonNonDeterministic(fields, aliases)) {
+            // do not have common non-deterministic expressions, and do not 
have more than
+            // maximum allowed common outputs.
+            if (!hasCommonNonDeterministic(fields, aliases) &&
+                !moreThanMaxAllowedCommonOutput(fields, aliases)) {

Review comment:
       `PhysicalOperation` does not have the same issue?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -760,12 +757,43 @@ object CollapseProject extends Rule[LogicalPlan] {
       s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, 
p2.projectList)))
   }
 
+  private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match {
+    case p1 @ Project(_, p2: Project) =>
+      val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
+
+      if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
+          getLargestNumOfCommonOutput(p1.projectList, p2.projectList) > 
maxCommonExprs) {
+        p1
+      } else {
+        collapseProjects(
+          p2.copy(projectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)))
+      }
+    case _ => plan
+  }
+
   private def collectAliases(projectList: Seq[NamedExpression]): 
AttributeMap[Alias] = {
     AttributeMap(projectList.collect {
       case a: Alias => a.toAttribute -> a
     })
   }
 
+  // Counts for the largest times common outputs from lower operator are used 
in upper operators.
+  private def getLargestNumOfCommonOutput(
+      upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Int = {
+    val aliases = collectAliases(lower)
+    val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+    upper.foreach(_.collect {
+      case a: Attribute if aliases.contains(a) => exprMap.update(a, 
exprMap.getOrElse(a, 0) + 1)
+    })
+
+    if (exprMap.size > 0) {

Review comment:
       `nonEmpty` instead?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+    buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")

Review comment:
       If we set this value to 1, all the existing tests can pass?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##########
@@ -2567,6 +2568,51 @@ class DataFrameSuite extends QueryTest
     val df = l.join(r, $"col2" === $"col4", "LeftOuter")
     checkAnswer(df, Row("2", "2"))
   }
+
+  test("SPARK-32945: Avoid collapsing projects if reaching max allowed common 
exprs") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c long, d string")
+
+    withTable("test_table") {
+      val jsonDF = Seq("""{"a":1, "b":2, "c": 123, "d": 
"test"}""").toDF("json")

Review comment:
       super nit: `DF` -> `Df`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+    buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+      .doc("An integer number indicates the maximum allowed number of common 
input expression " +
+        "from lower Project when being collapsed into upper Project by 
optimizer rule " +
+        "`CollapseProject`. Normally `CollapseProject` will collapse adjacent 
Project " +

Review comment:
       (Just a comment) If we set `CollapseProject` in 
`spark.sql.optimizer.excludedRules`, it seems like Spark still respects this 
value in `ScanOperation`? That behaviour might be okay, but it looks a bit 
weird to me.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+    buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+      .doc("An integer number indicates the maximum allowed number of common 
input expression " +
+        "from lower Project when being collapsed into upper Project by 
optimizer rule " +
+        "`CollapseProject`. Normally `CollapseProject` will collapse adjacent 
Project " +
+        "and merge expressions. But in some edge cases, expensive expressions 
might be " +
+        "duplicated many times in merged Project by this optimization. This 
config sets " +
+        "a maximum number. Once an expression is duplicated more than this 
number " +
+        "if merging two Project, Spark SQL will skip the merging. Note that 
normally " +
+        "in whole-stage codegen Project operator will de-duplicate expressions 
internally, " +
+        "but in edge cases Spark cannot do whole-stage codegen and fallback to 
interpreted " +
+        "mode. In such cases, users can use this config to avoid duplicate 
expressions")
+      .version("3.1.0")
+      .intConf
+      .checkValue(m => m > 0, "maxCommonExprsInCollapseProject must be larger 
than zero.")

Review comment:
       nit: `m => m > 0` -> `_ > 0`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -760,12 +757,43 @@ object CollapseProject extends Rule[LogicalPlan] {
       s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, 
p2.projectList)))
   }
 
+  private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match {
+    case p1 @ Project(_, p2: Project) =>
+      val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
+
+      if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
+          getLargestNumOfCommonOutput(p1.projectList, p2.projectList) > 
maxCommonExprs) {

Review comment:
       How about following the `ScanOperation` condition like this?
   ```
       case p1 @ Project(_, p2: Project) =>
         val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject
   
         if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
             moreThanMaxAllowedCommonOutput(p1.projectList, p2.projectList)) {
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -124,14 +126,34 @@ object ScanOperation extends OperationHelper with 
PredicateHelper {
     }.exists(!_.deterministic))
   }
 
+  def moreThanMaxAllowedCommonOutput(
+      expr: Seq[NamedExpression],
+      aliases: AttributeMap[Expression]): Boolean = {
+    val exprMap = mutable.HashMap.empty[Attribute, Int]
+
+    expr.foreach(_.collect {
+      case a: Attribute if aliases.contains(a) => exprMap.update(a, 
exprMap.getOrElse(a, 0) + 1)
+    })
+
+    val commonOutputs = if (exprMap.size > 0) {

Review comment:
       ```
   
       if (exprMap.nonEmpty) {
         exprMap.maxBy(_._2)._2 > SQLConf.get.maxCommonExprsInCollapseProject
       } else {
         false
       }
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1926,6 +1926,23 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT =
+    buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject")
+      .doc("An integer number indicates the maximum allowed number of common 
input expression " +
+        "from lower Project when being collapsed into upper Project by 
optimizer rule " +
+        "`CollapseProject`. Normally `CollapseProject` will collapse adjacent 
Project " +
+        "and merge expressions. But in some edge cases, expensive expressions 
might be " +
+        "duplicated many times in merged Project by this optimization. This 
config sets " +
+        "a maximum number. Once an expression is duplicated more than this 
number " +
+        "if merging two Project, Spark SQL will skip the merging. Note that 
normally " +
+        "in whole-stage codegen Project operator will de-duplicate expressions 
internally, " +
+        "but in edge cases Spark cannot do whole-stage codegen and fallback to 
interpreted " +
+        "mode. In such cases, users can use this config to avoid duplicate 
expressions")
+      .version("3.1.0")
+      .intConf
+      .checkValue(m => m > 0, "maxCommonExprsInCollapseProject must be larger 
than zero.")

Review comment:
       nit: `maxCommonExprsInCollapseProject` -> `The value of 
maxCommonExprsInCollapseProject `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to