Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r220440612
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
    + * type to Cross.
    + */
    +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
    +  def hasPythonUDF(expression: Expression): Boolean = {
    +    expression.collectFirst { case udf: PythonUDF => udf }.isDefined
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case j @ Join(_, _, joinType, condition)
    +      if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
    --- End diff --
    
    @cloud-fan Just saw your comment that validated my understanding on this 
rule relying on pushing down predicates through join. However, the 
pushdownPredicateThroughJoin is not in the nonExcludableRules list. So can we 
rely on this rule being fired always here ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to