This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b86ea0f [SPARK-33733][SQL][3.0] PullOutNondeterministic should check and collect deterministic field b86ea0f is described below commit b86ea0f895599e621f7ed1aa4213378f9b5cf621 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Thu Dec 17 08:44:42 2020 +0000 [SPARK-33733][SQL][3.0] PullOutNondeterministic should check and collect deterministic field backport [#30703](https://github.com/apache/spark/pull/30703) for branch-3.0. ### What changes were proposed in this pull request? The deterministic field is wider than `NonDerterministic`, we should keep same range between pull out and check analysis. ### Why are the changes needed? For example ``` select * from values(1), (4) as t(c1) order by java_method('java.lang.Math', 'abs', c1) ``` We will get exception since `java_method` deterministic field is false but not a `NonDeterministic` ``` Exception in thread "main" org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found: java_method('java.lang.Math', 'abs', t.`c1`) ASC NULLS FIRST in operator Sort [java_method(java.lang.Math, abs, c1#1) ASC NULLS FIRST], true ;; ``` ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Add test. Closes #30771 from ulysses-you/SPARK-33733-branch-3.0. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++++- .../expressions/CallMethodViaReflection.scala | 6 +++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7795d70..1307fc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2711,7 +2711,10 @@ class Analyzer( private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { exprs.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { case n: Nondeterministic => n } + val leafNondeterministic = expr.collect { + case n: Nondeterministic => n + case udf: UserDefinedExpression if !udf.deterministic => udf + } leafNondeterministic.distinct.map { e => val ne = e match { case n: NamedExpression => n diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index e6a4c8f..6d711af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -53,7 +53,7 @@ import org.apache.spark.util.Utils a5cf6c42-0c85-418f-af6c-3e4e5b1328f2 """) case class CallMethodViaReflection(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Nondeterministic with CodegenFallback { override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("reflect") @@ -76,11 +76,11 @@ case class CallMethodViaReflection(children: Seq[Expression]) } } - override lazy val deterministic: Boolean = false override def nullable: Boolean = true override val dataType: DataType = StringType + override protected def initializeInternal(partitionIndex: Int): Unit = {} - override def eval(input: InternalRow): Any = { + override protected def evalInternal(input: InternalRow): Any = { var i = 0 while (i < argExprs.length) { buffer(i) = argExprs(i).eval(input).asInstanceOf[Object] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index b432747..63c2779 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -951,4 +951,26 @@ class AnalysisSuite extends AnalysisTest with Matchers { s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value.")) } } + + test("SPARK-33733: PullOutNondeterministic should check and collect deterministic field") { + val reflect = + CallMethodViaReflection(Seq("java.lang.Math", "abs", testRelation.output.head)) + val udf = ScalaUDF( + (s: String) => s, + StringType, + Literal.create(null, StringType) :: Nil, + Option(ExpressionEncoder[String]().resolveAndBind()) :: Nil, + udfDeterministic = false) + + Seq(reflect, udf).foreach { e: Expression => + val plan = Sort(Seq(e.asc), false, testRelation) + val projected = Alias(e, "_nondeterministic")() + val expect = + Project(testRelation.output, + Sort(Seq(projected.toAttribute.asc), false, + Project(testRelation.output :+ projected, + testRelation))) + checkAnalysis(plan, expect) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org