Github user dilipbiswal commented on a diff in the pull request:
https://github.com/apache/spark/pull/22326#discussion_r220438880
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan]
with PredicateHelper {
if (j.joinType == newJoinType) f else Filter(condition,
j.copy(joinType = newJoinType))
}
}
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with
PredicateHelper {
+ def hasPythonUDF(expression: Expression): Boolean = {
+ expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case j @ Join(_, _, joinType, condition)
+ if
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --
@xuanyuanking Oh.. it is because the UDFS referring to single leg would
have been pushed down and we will only have UDFs referring to both legs in the
join condition when we come here ?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]