Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r220562279
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
    + * type to Cross.
    + */
    +object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
    +  def hasPythonUDF(expression: Expression): Boolean = {
    +    expression.collectFirst { case udf: PythonUDF => udf }.isDefined
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case j @ Join(_, _, joinType, condition)
    +        if condition.isDefined && hasPythonUDF(condition.get) =>
    +      if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
    +        // The current strategy only support InnerLike and LeftSemi join 
because for other type,
    +        // it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
    +        // the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
    +        // `requires attributes from more than one child`, we throw 
firstly here for better
    +        // readable information.
    +        throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
    +          s" $joinType is not supported.")
    +      }
    +      // if condition expression contains python udf, it will be moved out 
from
    +      // the new join conditions, and the join type will be changed to 
CrossJoin.
    +      logWarning(s"The join condition:$condition of the join plan contains 
" +
    +        "PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
    +        s"join when its the only condition. This plan shows below:\n $j")
    +      val (udf, rest) =
    +        
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
    +      val newCondition = if (rest.isEmpty) {
    +        Option.empty
    +      } else {
    +        Some(rest.reduceLeft(And))
    +      }
    +      val newJoin = j.copy(condition = newCondition)
    +      joinType match {
    +        case _: InnerLike =>
    +          Filter(udf.reduceLeft(And), newJoin)
    +        case LeftSemi =>
    +          Project(
    --- End diff --
    
    I tried two ways to implement LeftAnti here:
    1. Use the Except(join.left, left semi result, isAll=false) to simulate, it 
is banned by strategy and actually also no plan for 
Except.https://github.com/apache/spark/blob/89671a27e783d77d4bfaec3d422cc8dd468ef04c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L557-L559
    2. Also use cross join and filter to simulate, but maybe it can't reached 
when there's only udf in anti join condition. Because after cross join, it's 
hard to roll back to original status. UDF+ normal common condition can be 
simulated by
    ```
    Project(
    j.left.output.map(_.toAttribute),
    Filter(Not(udf.reduceLeft(And)),
    newJoin.copy(joinType = Inner, condition = not(rest.reduceLeft(And)))))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to