This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 97efe0efb26 [SPARK-39162][SQL] Jdbc dialect should decide which 
function could be pushed down
97efe0efb26 is described below

commit 97efe0efb2665833910e13eb7bae16cc1ad4e0fa
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Sat May 14 16:28:21 2022 -0700

    [SPARK-39162][SQL] Jdbc dialect should decide which function could be 
pushed down
    
    ### What changes were proposed in this pull request?
    Regardless of whether the functions are ANSI or not, most databases are 
actually unsure of their support.
    So we should add a new API into `JdbcDialect` so that jdbc dialect decide 
which function could be pushed down.
    
    ### Why are the changes needed?
    Let function push-down more flexible.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    Exists tests.
    
    Closes #36521 from beliefer/SPARK-39162.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: huaxingao <huaxin_...@apple.com>
---
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 ----
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 28 ++++------------------
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 19 +++++++++++++++
 3 files changed, 23 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3b167eeb417..efb4389ec50 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2404,10 +2404,6 @@ object QueryCompilationErrors extends QueryErrorsBase {
       "Sinks cannot request distribution and ordering in continuous execution 
mode")
   }
 
-  def noSuchFunctionError(database: String, funcInfo: String): Throwable = {
-    new AnalysisException(s"$database does not support function: $funcInfo")
-  }
-
   // Return a more descriptive error message if the user tries to nest a 
DEFAULT column reference
   // inside some other expression (such as DEFAULT + 1) in an INSERT INTO 
command's VALUES list;
   // this is not allowed.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 56cadbe8e2c..4a88203ec59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -20,13 +20,9 @@ package org.apache.spark.sql.jdbc
 import java.sql.{SQLException, Types}
 import java.util.Locale
 
-import scala.util.control.NonFatal
-
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, 
GeneralAggregateFunc}
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, ShortType, StringType}
 
@@ -34,27 +30,11 @@ private object H2Dialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")
 
-  class H2SQLBuilder extends JDBCSQLBuilder {
-    override def visitSQLFunction(funcName: String, inputs: Array[String]): 
String = {
-      funcName match {
-        case "WIDTH_BUCKET" =>
-          val functionInfo = super.visitSQLFunction(funcName, inputs)
-          throw QueryCompilationErrors.noSuchFunctionError("H2", functionInfo)
-        case _ => super.visitSQLFunction(funcName, inputs)
-      }
-    }
-  }
+  private val supportedFunctions =
+    Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL")
 
-  override def compileExpression(expr: Expression): Option[String] = {
-    val h2SQLBuilder = new H2SQLBuilder()
-    try {
-      Some(h2SQLBuilder.build(expr))
-    } catch {
-      case NonFatal(e) =>
-        logWarning("Error occurs while compiling V2 expression", e)
-        None
-    }
-  }
+  override def isSupportedFunction(funcName: String): Boolean =
+    supportedFunctions.contains(funcName)
 
   override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
     super.compileAggregate(aggFunction).orElse(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 0ef23073a27..e1883e4e7f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -240,8 +240,27 @@ abstract class JdbcDialect extends Serializable with 
Logging{
         
getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName)
       s"CAST($l AS $databaseTypeDefinition)"
     }
+
+    override def visitSQLFunction(funcName: String, inputs: Array[String]): 
String = {
+      if (isSupportedFunction(funcName)) {
+        s"""$funcName(${inputs.mkString(", ")})"""
+      } else {
+        // The framework will catch the error and give up the push-down.
+        // Please see `JdbcDialect.compileExpression(expr: Expression)` for 
more details.
+        throw new UnsupportedOperationException(
+          s"${this.getClass.getSimpleName} does not support function: 
$funcName")
+      }
+    }
   }
 
+  /**
+   * Returns whether the database supports function.
+   * @param funcName Upper-cased function name
+   * @return True if the database supports function.
+   */
+  @Since("3.3.0")
+  def isSupportedFunction(funcName: String): Boolean = false
+
   /**
    * Converts V2 expression to String representing a SQL expression.
    * @param expr The V2 expression to be converted.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to