Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22326#discussion_r220198104
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
@@ -152,3 +153,60 @@ object EliminateOuterJoin extends Rule[LogicalPlan]
with PredicateHelper {
if (j.joinType == newJoinType) f else Filter(condition,
j.copy(joinType = newJoinType))
}
}
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with
PredicateHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case j @ Join(_, _, joinType, condition)
+ if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+ _.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+ if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+ // The current strategy only support InnerLike and LeftSemi join
because for other type,
+ // it breaks SQL semantic if we run the join condition as a filter
after join. If we pass
+ // the plan here, it'll still get a an invalid PythonUDF
RuntimeException with message
+ // `requires attributes from more than one child`, we throw
firstly here for better
+ // readable information.
+ throw new AnalysisException("Using PythonUDF in join condition of
join type" +
+ s" $joinType is not supported.")
+ }
+ if (SQLConf.get.crossJoinEnabled) {
+ // if condition expression contains python udf, it will be moved
out from
+ // the new join conditions, and the join type will be changed to
CrossJoin.
+ logWarning(s"The join condition:$condition of the join plan
contains " +
+ "PythonUDF, it will be moved out and the join plan will be " +
+ s"turned to cross join. This plan shows below:\n $j")
+ val (udf, rest) =
+ condition.map(splitConjunctivePredicates).get.partition(
+ _.collectFirst { case udf: PythonUDF => udf }.isDefined)
+ val newCondition = if (rest.isEmpty) {
+ Option.empty
+ } else {
+ Some(rest.reduceLeft(And))
+ }
+ val newJoin = j.copy(joinType = Cross, condition = newCondition)
+ joinType match {
+ case _: InnerLike =>
+ Filter(udf.reduceLeft(And), newJoin)
+ case LeftSemi =>
+ Project(
+ j.left.output.map(_.toAttribute),
Filter(udf.reduceLeft(And), newJoin))
+ case _ =>
+ throw new AnalysisException("Using PythonUDF in join condition
of join type" +
+ s" $joinType is not supported.")
+ }
+ } else {
+ // if the crossJoinEnabled is false, a RuntimeException will be
thrown later while
+ // the PythonUDF need to access both side of join, we throw
firstly here for better
+ // readable information.
+ throw new AnalysisException(s"Detected the join
condition:$condition of this join " +
--- End diff --
I agree the error message is better than just saying cross join is
detected. But I'm worried about duplicating the cross join detection logic in
multiple rules. e.g. what if after pulling out python udf from join condition,
we still have other join conditions so it's not cross join?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]