karuppayya commented on a change in pull request #28424:
URL: https://github.com/apache/spark/pull/28424#discussion_r421681693



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {
+    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= 
SQLConf.get.autoBroadcastJoinThreshold
+  }
+
+  private def shouldApplyDistinctToRightSide(right: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else if (!canBroadcast(right) && canBroadcast(Distinct(right))) {

Review comment:
       Should we check only for `canBroadcast(Distinct(right)`, since 
`canBroadcast(right)` will be checked during physical planning in 
`JoinSelection`.. That way we could avoid having to copy `canBroadcast` logic? 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {

Review comment:
       1. Should we write a separate rule so that all queries with distinct 
followed by left semi will benefit? (`select distinct(a.col1) from a left semi 
join b on a.col2 = b.col2`)
   2. Can we make it generic so that distinct followed by right semi join also 
gets handled?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {
+    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= 
SQLConf.get.autoBroadcastJoinThreshold
+  }
+
+  private def shouldApplyDistinctToRightSide(right: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else if (!canBroadcast(right) && canBroadcast(Distinct(right))) {
+      true
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(right)
+    }
+  }
+
+  private def shouldApplyDistinctToLeftSide(left: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(left)
+    }
+  }
+
+  private def shouldApplyDistinctBasedOnReductionThreshold(plan: LogicalPlan): 
Boolean = {
+    (plan.stats.rowCount, Distinct(plan).stats.rowCount) match {
+      case (Some(rowCountBefore), Some(rowCountAfter)) =>
+        val reductionRatio = if (rowCountAfter != 0) {
+          rowCountBefore / rowCountAfter
+        } else {
+          // If rowCountAfter=0, Most likely rowCountBefore should also be 0 
as applying Distinct()
+          // can make rows 0 only when initial input has 0 rows
+          // Making reductionRatio = rowCountBefore to be on safe side
+          rowCountBefore
+        }
+        if (reductionRatio >= 
SQLConf.get.optimizeIntersectDistinctReductionThreshold) {
+          true
+        } else {
+          false
+        }
+      case (_, _) =>
+        // Stats are not available
+        false
+    }
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case Intersect(left, right, false) =>

Review comment:
       Do we want to leave the old code as is and add a sperate branch of case 
match which gets activated on `SQLConf.get.optimizeIntersectEnabled`, that way 
we could avoid checks for the flag in each of the methods?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {
+    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= 
SQLConf.get.autoBroadcastJoinThreshold
+  }
+
+  private def shouldApplyDistinctToRightSide(right: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else if (!canBroadcast(right) && canBroadcast(Distinct(right))) {
+      true
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(right)
+    }
+  }
+
+  private def shouldApplyDistinctToLeftSide(left: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {

Review comment:
       nit: `SQLConf.get.optimizeIntersectEnabled && 
shouldApplyDistinctBasedOnReductionThreshold(left)`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {
+    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= 
SQLConf.get.autoBroadcastJoinThreshold
+  }
+
+  private def shouldApplyDistinctToRightSide(right: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else if (!canBroadcast(right) && canBroadcast(Distinct(right))) {
+      true
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(right)
+    }
+  }
+
+  private def shouldApplyDistinctToLeftSide(left: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(left)
+    }
+  }
+
+  private def shouldApplyDistinctBasedOnReductionThreshold(plan: LogicalPlan): 
Boolean = {
+    (plan.stats.rowCount, Distinct(plan).stats.rowCount) match {
+      case (Some(rowCountBefore), Some(rowCountAfter)) =>
+        val reductionRatio = if (rowCountAfter != 0) {
+          rowCountBefore / rowCountAfter
+        } else {
+          // If rowCountAfter=0, Most likely rowCountBefore should also be 0 
as applying Distinct()
+          // can make rows 0 only when initial input has 0 rows
+          // Making reductionRatio = rowCountBefore to be on safe side
+          rowCountBefore
+        }
+        if (reductionRatio >= 
SQLConf.get.optimizeIntersectDistinctReductionThreshold) {

Review comment:
       nit: remove if.. else.. `reductionRatio >= 
SQLConf.get.optimizeIntersectDistinctReductionThreshold` should do?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config

Review comment:
       1.  Can we mention that CBO needs to be enabled?
   2. Also column stats is required, is this right?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {
+    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= 
SQLConf.get.autoBroadcastJoinThreshold
+  }
+
+  private def shouldApplyDistinctToRightSide(right: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else if (!canBroadcast(right) && canBroadcast(Distinct(right))) {
+      true
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(right)
+    }
+  }
+
+  private def shouldApplyDistinctToLeftSide(left: LogicalPlan): Boolean = {
+    if (!SQLConf.get.optimizeIntersectEnabled) {
+      false
+    } else {
+      shouldApplyDistinctBasedOnReductionThreshold(left)
+    }
+  }
+
+  private def shouldApplyDistinctBasedOnReductionThreshold(plan: LogicalPlan): 
Boolean = {
+    (plan.stats.rowCount, Distinct(plan).stats.rowCount) match {
+      case (Some(rowCountBefore), Some(rowCountAfter)) =>
+        val reductionRatio = if (rowCountAfter != 0) {
+          rowCountBefore / rowCountAfter
+        } else {
+          // If rowCountAfter=0, Most likely rowCountBefore should also be 0 
as applying Distinct()
+          // can make rows 0 only when initial input has 0 rows
+          // Making reductionRatio = rowCountBefore to be on safe side
+          rowCountBefore
+        }
+        if (reductionRatio >= 
SQLConf.get.optimizeIntersectDistinctReductionThreshold) {
+          true
+        } else {
+          false
+        }
+      case (_, _) =>

Review comment:
       nit: `case _ =>`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to