[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r238082589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) --- End diff -- It's only possible to have scalar UDF in join condition, so changing it to `e.isInstanceOf[PythonUDF]` is same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r237944306 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) --- End diff -- We might need a comment to explain why we only pull out the Scalar `PythonUDF`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23153 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r236743539 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) +}.isDefined } override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case j @ Join(_, _, joinType, condition) -if condition.isDefined && hasPythonUDF(condition.get) => +case j @ Join(_, _, joinType, Some(cond)) if hasUnevaluablePythonUDF(cond, j) => --- End diff -- Followed by the rule changes, we need modify the suites in `PullOutPythonUDFInJoinConditionSuite`, the suites should also construct the dummy python udf from both side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r236647128 --- Diff: python/pyspark/sql/tests/test_udf.py --- @@ -209,6 +209,18 @@ def test_udf_in_join_condition(self): with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_left_outer_join_condition(self): +# regression test for SPARK-26147 +from pyspark.sql.functions import udf, col +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a: str(a), StringType()) +# The join condition can't be pushed down, as it refers to attributes from both sides. +# The Python UDF only refer to attributes from one side, so it's evaluable. +df = left.join(right, f("a") == col("b").cast("string"), how = "left_outer") --- End diff -- style nit: how="left_outer" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23153 [SPARK-26147][SQL] only pull out unevaluable python udf from join condition ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. ## How was this patch tested? a new test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark join Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23153 commit cb195cf4b08aef9f4beb3ed4c18580fe5a76c65c Author: Wenchen Fan Date: 2018-11-27T11:19:12Z only pull out unevaluable python udf from join condition --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org