Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r220438880
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
    + * type to Cross.
    + */
    +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
    +  def hasPythonUDF(expression: Expression): Boolean = {
    +    expression.collectFirst { case udf: PythonUDF => udf }.isDefined
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case j @ Join(_, _, joinType, condition)
    +      if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
    --- End diff --
    
    @xuanyuanking Oh..  it is because the UDFS referring to single leg would 
have been pushed down and we will only have UDFs referring to both legs in the 
join condition when we come here ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to