Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/22326#discussion_r220562279
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan]
with PredicateHelper {
if (j.joinType == newJoinType) f else Filter(condition,
j.copy(joinType = newJoinType))
}
}
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with
PredicateHelper {
+ def hasPythonUDF(expression: Expression): Boolean = {
+ expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case j @ Join(_, _, joinType, condition)
+ if condition.isDefined && hasPythonUDF(condition.get) =>
+ if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+ // The current strategy only support InnerLike and LeftSemi join
because for other type,
+ // it breaks SQL semantic if we run the join condition as a filter
after join. If we pass
+ // the plan here, it'll still get a an invalid PythonUDF
RuntimeException with message
+ // `requires attributes from more than one child`, we throw
firstly here for better
+ // readable information.
+ throw new AnalysisException("Using PythonUDF in join condition of
join type" +
+ s" $joinType is not supported.")
+ }
+ // if condition expression contains python udf, it will be moved out
from
+ // the new join conditions, and the join type will be changed to
CrossJoin.
+ logWarning(s"The join condition:$condition of the join plan contains
" +
+ "PythonUDF, it will be moved out and the join plan will be turned
to cross " +
+ s"join when its the only condition. This plan shows below:\n $j")
+ val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+ val newCondition = if (rest.isEmpty) {
+ Option.empty
+ } else {
+ Some(rest.reduceLeft(And))
+ }
+ val newJoin = j.copy(condition = newCondition)
+ joinType match {
+ case _: InnerLike =>
+ Filter(udf.reduceLeft(And), newJoin)
+ case LeftSemi =>
+ Project(
--- End diff --
I tried two ways to implement LeftAnti here:
1. Use the Except(join.left, left semi result, isAll=false) to simulate, it
is banned by strategy and actually also no plan for
Except.https://github.com/apache/spark/blob/89671a27e783d77d4bfaec3d422cc8dd468ef04c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L557-L559
2. Also use cross join and filter to simulate, but maybe it can't reached
when there's only udf in anti join condition. Because after cross join, it's
hard to roll back to original status. UDF+ normal common condition can be
simulated by
```
Project(
j.left.output.map(_.toAttribute),
Filter(Not(udf.reduceLeft(And)),
newJoin.copy(joinType = Inner, condition = not(rest.reduceLeft(And)))))
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]