maryannxue commented on a change in pull request #28424:
URL: https://github.com/apache/spark/pull/28424#discussion_r439518296



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1553,13 +1553,84 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
  * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for 
INTERSECT ALL.
  * 2. This rule has to be done after de-duplicating the attributes; otherwise, 
the generated
  *    join conditions will be incorrect.
+ *
+ * This rule also pushed down the Distinct to left/right side based on the 
config
+ *   `SQLConf.OPTIMIZE_INTERSECT_ENABLED`
+ * 1. It pushes down Distinct on left side if applying a Distinct on left 
reduces rows by
+ *   a configured threshold
+ * 2. It pushes down Distinct on right side if applying a Distinct on right 
side converts the join
+ *   type to broadcast join, or it reduces rows by a configured threshold
+ * If Distinct is pushdown on left side, it won't apply Distinct after Join as 
it is not needed.
  */
 object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
+  private def canBroadcast(plan: LogicalPlan): Boolean = {

Review comment:
       Now that we have `JoinSelectionHelper`, can we use the methods over 
there?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to