This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 97efe0efb26 [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down 97efe0efb26 is described below commit 97efe0efb2665833910e13eb7bae16cc1ad4e0fa Author: Jiaan Geng <belie...@163.com> AuthorDate: Sat May 14 16:28:21 2022 -0700 [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down ### What changes were proposed in this pull request? Regardless of whether the functions are ANSI or not, most databases are actually unsure of their support. So we should add a new API into `JdbcDialect` so that jdbc dialect decide which function could be pushed down. ### Why are the changes needed? Let function push-down more flexible. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists tests. Closes #36521 from beliefer/SPARK-39162. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: huaxingao <huaxin_...@apple.com> --- .../spark/sql/errors/QueryCompilationErrors.scala | 4 ---- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 28 ++++------------------ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 19 +++++++++++++++ 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 3b167eeb417..efb4389ec50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2404,10 +2404,6 @@ object QueryCompilationErrors extends QueryErrorsBase { "Sinks cannot request distribution and ordering in continuous execution mode") } - def noSuchFunctionError(database: String, funcInfo: String): Throwable = { - new AnalysisException(s"$database does not support function: $funcInfo") - } - // Return a more descriptive error message if the user tries to nest a DEFAULT column reference // inside some other expression (such as DEFAULT + 1) in an INSERT INTO command's VALUES list; // this is not allowed. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 56cadbe8e2c..4a88203ec59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -20,13 +20,9 @@ package org.apache.spark.sql.jdbc import java.sql.{SQLException, Types} import java.util.Locale -import scala.util.control.NonFatal - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} @@ -34,27 +30,11 @@ private object H2Dialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") - class H2SQLBuilder extends JDBCSQLBuilder { - override def visitSQLFunction(funcName: String, inputs: Array[String]): String = { - funcName match { - case "WIDTH_BUCKET" => - val functionInfo = super.visitSQLFunction(funcName, inputs) - throw QueryCompilationErrors.noSuchFunctionError("H2", functionInfo) - case _ => super.visitSQLFunction(funcName, inputs) - } - } - } + private val supportedFunctions = + Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL") - override def compileExpression(expr: Expression): Option[String] = { - val h2SQLBuilder = new H2SQLBuilder() - try { - Some(h2SQLBuilder.build(expr)) - } catch { - case NonFatal(e) => - logWarning("Error occurs while compiling V2 expression", e) - None - } - } + override def isSupportedFunction(funcName: String): Boolean = + supportedFunctions.contains(funcName) override def compileAggregate(aggFunction: AggregateFunc): Option[String] = { super.compileAggregate(aggFunction).orElse( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 0ef23073a27..e1883e4e7f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -240,8 +240,27 @@ abstract class JdbcDialect extends Serializable with Logging{ getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName) s"CAST($l AS $databaseTypeDefinition)" } + + override def visitSQLFunction(funcName: String, inputs: Array[String]): String = { + if (isSupportedFunction(funcName)) { + s"""$funcName(${inputs.mkString(", ")})""" + } else { + // The framework will catch the error and give up the push-down. + // Please see `JdbcDialect.compileExpression(expr: Expression)` for more details. + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} does not support function: $funcName") + } + } } + /** + * Returns whether the database supports function. + * @param funcName Upper-cased function name + * @return True if the database supports function. + */ + @Since("3.3.0") + def isSupportedFunction(funcName: String): Boolean = false + /** * Converts V2 expression to String representing a SQL expression. * @param expr The V2 expression to be converted. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org