Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r220440612 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Correctly handle PythonUDF which need access both side of join side by changing the new join + * type to Cross. + */ +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { + def hasPythonUDF(expression: Expression): Boolean = { + expression.collectFirst { case udf: PythonUDF => udf }.isDefined + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case j @ Join(_, _, joinType, condition) + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) => --- End diff -- @cloud-fan Just saw your comment that validated my understanding on this rule relying on pushing down predicates through join. However, the pushdownPredicateThroughJoin is not in the nonExcludableRules list. So can we rely on this rule being fired always here ?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org