Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r220198104
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -152,3 +153,60 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
    + * type to Cross.
    + */
    +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
    +  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case j @ Join(_, _, joinType, condition)
    +      if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
    +        _.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
    +      if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
    +        // The current strategy only support InnerLike and LeftSemi join 
because for other type,
    +        // it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
    +        // the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
    +        // `requires attributes from more than one child`, we throw 
firstly here for better
    +        // readable information.
    +        throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
    +          s" $joinType is not supported.")
    +      }
    +      if (SQLConf.get.crossJoinEnabled) {
    +        // if condition expression contains python udf, it will be moved 
out from
    +        // the new join conditions, and the join type will be changed to 
CrossJoin.
    +        logWarning(s"The join condition:$condition of the join plan 
contains " +
    +          "PythonUDF, it will be moved out and the join plan will be " +
    +          s"turned to cross join. This plan shows below:\n $j")
    +        val (udf, rest) =
    +          condition.map(splitConjunctivePredicates).get.partition(
    +            _.collectFirst { case udf: PythonUDF => udf }.isDefined)
    +        val newCondition = if (rest.isEmpty) {
    +          Option.empty
    +        } else {
    +          Some(rest.reduceLeft(And))
    +        }
    +        val newJoin = j.copy(joinType = Cross, condition = newCondition)
    +        joinType match {
    +          case _: InnerLike =>
    +            Filter(udf.reduceLeft(And), newJoin)
    +          case LeftSemi =>
    +            Project(
    +              j.left.output.map(_.toAttribute), 
Filter(udf.reduceLeft(And), newJoin))
    +          case _ =>
    +            throw new AnalysisException("Using PythonUDF in join condition 
of join type" +
    +              s" $joinType is not supported.")
    +        }
    +      } else {
    +        // if the crossJoinEnabled is false, a RuntimeException will be 
thrown later while
    +        // the PythonUDF need to access both side of join, we throw 
firstly here for better
    +        // readable information.
    +        throw new AnalysisException(s"Detected the join 
condition:$condition of this join " +
    --- End diff --
    
    I agree the error message is better than just saying cross join is 
detected. But I'm worried about duplicating the cross join detection logic in 
multiple rules. e.g. what if after pulling out python udf from join condition, 
we still have other join conditions so it's not cross join?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to