Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r220438880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Correctly handle PythonUDF which need access both side of join side by changing the new join + * type to Cross. + */ +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { + def hasPythonUDF(expression: Expression): Boolean = { + expression.collectFirst { case udf: PythonUDF => udf }.isDefined + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case j @ Join(_, _, joinType, condition) + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) => --- End diff -- @xuanyuanking Oh.. it is because the UDFS referring to single leg would have been pushed down and we will only have UDFs referring to both legs in the join condition when we come here ?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org