This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b86ea0f  [SPARK-33733][SQL][3.0] PullOutNondeterministic should check 
and collect deterministic field
b86ea0f is described below

commit b86ea0f895599e621f7ed1aa4213378f9b5cf621
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Thu Dec 17 08:44:42 2020 +0000

    [SPARK-33733][SQL][3.0] PullOutNondeterministic should check and collect 
deterministic field
    
    backport [#30703](https://github.com/apache/spark/pull/30703) for 
branch-3.0.
    
    ### What changes were proposed in this pull request?
    
    The deterministic field is wider than `NonDerterministic`, we should keep 
same range between pull out and check analysis.
    
    ### Why are the changes needed?
    
    For example
    ```
    select * from values(1), (4) as t(c1) order by 
java_method('java.lang.Math', 'abs', c1)
    ```
    
    We will get exception since `java_method` deterministic field is false but 
not a `NonDeterministic`
    ```
    Exception in thread "main" org.apache.spark.sql.AnalysisException: 
nondeterministic expressions are only allowed in
    Project, Filter, Aggregate or Window, found:
     java_method('java.lang.Math', 'abs', t.`c1`) ASC NULLS FIRST
    in operator Sort [java_method(java.lang.Math, abs, c1#1) ASC NULLS FIRST], 
true
                   ;;
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Add test.
    
    Closes #30771 from ulysses-you/SPARK-33733-branch-3.0.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  5 ++++-
 .../expressions/CallMethodViaReflection.scala      |  6 +++---
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 22 ++++++++++++++++++++++
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7795d70..1307fc5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2711,7 +2711,10 @@ class Analyzer(
 
     private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
       exprs.filterNot(_.deterministic).flatMap { expr =>
-        val leafNondeterministic = expr.collect { case n: Nondeterministic => 
n }
+        val leafNondeterministic = expr.collect {
+          case n: Nondeterministic => n
+          case udf: UserDefinedExpression if !udf.deterministic => udf
+        }
         leafNondeterministic.distinct.map { e =>
           val ne = e match {
             case n: NamedExpression => n
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
index e6a4c8f..6d711af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
@@ -53,7 +53,7 @@ import org.apache.spark.util.Utils
        a5cf6c42-0c85-418f-af6c-3e4e5b1328f2
   """)
 case class CallMethodViaReflection(children: Seq[Expression])
-  extends Expression with CodegenFallback {
+  extends Nondeterministic with CodegenFallback {
 
   override def prettyName: String = 
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("reflect")
 
@@ -76,11 +76,11 @@ case class CallMethodViaReflection(children: 
Seq[Expression])
     }
   }
 
-  override lazy val deterministic: Boolean = false
   override def nullable: Boolean = true
   override val dataType: DataType = StringType
+  override protected def initializeInternal(partitionIndex: Int): Unit = {}
 
-  override def eval(input: InternalRow): Any = {
+  override protected def evalInternal(input: InternalRow): Any = {
     var i = 0
     while (i < argExprs.length) {
       buffer(i) = argExprs(i).eval(input).asInstanceOf[Object]
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index b432747..63c2779 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -951,4 +951,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger 
value."))
     }
   }
+
+  test("SPARK-33733: PullOutNondeterministic should check and collect 
deterministic field") {
+    val reflect =
+      CallMethodViaReflection(Seq("java.lang.Math", "abs", 
testRelation.output.head))
+    val udf = ScalaUDF(
+      (s: String) => s,
+      StringType,
+      Literal.create(null, StringType) :: Nil,
+      Option(ExpressionEncoder[String]().resolveAndBind()) :: Nil,
+      udfDeterministic = false)
+
+    Seq(reflect, udf).foreach { e: Expression =>
+      val plan = Sort(Seq(e.asc), false, testRelation)
+      val projected = Alias(e, "_nondeterministic")()
+      val expect =
+        Project(testRelation.output,
+          Sort(Seq(projected.toAttribute.asc), false,
+            Project(testRelation.output :+ projected,
+              testRelation)))
+      checkAnalysis(plan, expect)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to