This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4fa447c Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" 4fa447c is described below commit 4fa447cabc1c09170edd9da0e586660a7ae0db74 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Tue Mar 3 14:21:20 2020 +0800 Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" This reverts commit afaeb29599593f021c9ea47e52f8c70013a4afef. ### What changes were proposed in this pull request? Based on the result and comment from https://github.com/apache/spark/pull/27552#discussion_r385531744 In the hive module, server-side provides datetime values simply use `value.toSting`, and the client-side regenerates the results back in `HiveBaseResultSet` with `java.sql.Date(Timestamp).valueOf`. there will be inconsistency between client and server if we use java8 APIs ### Why are the changes needed? the change is still unclear enough ### Does this PR introduce any user-facing change? no ### How was this patch tested? Nah Closes #27733 from yaooqinn/SPARK-30808. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 1fac06c4307c8c7a5a48a50952d48ee5b9ebccb2) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/HiveResult.scala | 61 ++++++++-------------- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- .../spark/sql/execution/HiveResultSuite.scala | 25 +++++---- .../SparkExecuteStatementOperation.scala | 10 +--- .../sql/hive/thriftserver/SparkSQLDriver.scala | 5 +- .../sql/hive/execution/HiveComparisonTest.scala | 4 +- 7 files changed, 43 insertions(+), 67 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index b191840..5a2f16d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -21,10 +21,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time.{Instant, LocalDate} -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -37,43 +36,27 @@ object HiveResult { * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. */ - def hiveResultString(ds: Dataset[_]): Seq[String] = { - val executedPlan = ds.queryExecution.executedPlan - executedPlan match { - case ExecutedCommandExec(_: DescribeCommandBase) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - executedPlan.executeCollectPublic().map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - // SHOW TABLES in Hive only output table names, - // while ours output database, table name, isTemp. - case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => - command.executeCollect().map(_.getString(1)) - case _ => - val sessionWithJava8DatetimeEnabled = { - val cloned = ds.sparkSession.cloneSession() - cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true) - cloned - } - sessionWithJava8DatetimeEnabled.withActive { - // We cannot collect the original dataset because its encoders could be created - // with disabled Java 8 date-time API. - val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan) - .queryExecution - .executedPlan - .executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = executedPlan.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(e => toHiveString(e))) - .map(_.mkString("\t")) - } - } + def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { + case ExecutedCommandExec(_: DescribeCommandBase) => + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + executedPlan.executeCollectPublic().map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") + } + // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. + case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => + command.executeCollect().map(_.getString(1)) + case other => + val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = executedPlan.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(e => toHiveString(e))) + .map(_.mkString("\t")) } private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 87de8f5..563b4d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -190,7 +190,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark example.split(" > ").toList.foreach(_ match { case exampleRe(sql, output) => val df = clonedSpark.sql(sql) - val actual = unindentAndTrim(hiveResultString(df).mkString("\n")) + val actual = unindentAndTrim( + hiveResultString(df.queryExecution.executedPlan).mkString("\n")) val expected = unindentAndTrim(output) assert(actual === expected) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 34829f1..6c66166 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -511,7 +511,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { val schema = df.schema.catalogString // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) { - hiveResultString(df).map(replaceNotIncludedMsg) + hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg) } // If the output is not pre-sorted, sort it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index bddd15c..bb59b12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -25,10 +25,11 @@ class HiveResultSuite extends SharedSparkSession { test("date formatting in hive result") { val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15") val df = dates.toDF("a").selectExpr("cast(a as date) as b") - val result = HiveResult.hiveResultString(df) + val executedPlan1 = df.queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan1) assert(result == dates) - val df2 = df.selectExpr("array(b)") - val result2 = HiveResult.hiveResultString(df2) + val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan + val result2 = HiveResult.hiveResultString(executedPlan2) assert(result2 == dates.map(x => s"[$x]")) } @@ -39,10 +40,11 @@ class HiveResultSuite extends SharedSparkSession { "1582-10-14 01:02:03", "1582-10-15 01:02:03") val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") - val result = HiveResult.hiveResultString(df) + val executedPlan1 = df.queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan1) assert(result == timestamps) - val df2 = df.selectExpr("array(b)") - val result2 = HiveResult.hiveResultString(df2) + val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan + val result2 = HiveResult.hiveResultString(executedPlan2) assert(result2 == timestamps.map(x => s"[$x]")) } @@ -55,14 +57,15 @@ class HiveResultSuite extends SharedSparkSession { test("decimal formatting in hive result") { val df = Seq(new java.math.BigDecimal("1")).toDS() Seq(2, 6, 18).foreach { scala => - val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))") - val result = HiveResult.hiveResultString(decimalDf) + val executedPlan = + df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) assert(result.head.split("\\.").last.length === scala) } - val df2 = Seq(java.math.BigDecimal.ZERO).toDS() - .selectExpr(s"CAST(value AS decimal(38, 8))") - val result = HiveResult.hiveResultString(df2) + val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS() + .selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) assert(result.head === "0.00000000") } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 7bcd803..cf0e5eb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate} import java.util.{Arrays, Map => JMap, UUID} import java.util.concurrent.RejectedExecutionException @@ -179,14 +178,7 @@ private[hive] class SparkExecuteStatementOperation( } curCol += 1 } - // Convert date-time instances to types that are acceptable by Hive libs - // used in conversions to strings. - val resultRow = row.map { - case i: Instant => Timestamp.from(i) - case ld: LocalDate => Date.valueOf(ld) - case other => other - }.toArray.asInstanceOf[Array[Object]] - resultRowSet.addRow(resultRow) + resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) curRow += 1 resultOffset += 1 } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 64e91f4..12fba0e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -60,10 +60,9 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont // TODO unify the error code try { context.sparkContext.setJobDescription(command) - val df = context.sql(command) - val execution = df.queryExecution + val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) hiveResponse = SQLExecution.withNewExecutionId(execution) { - hiveResultString(df) + hiveResultString(execution.executedPlan) } tableSchema = getResultSetSchema(execution) new CommandProcessorResponse(0) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 82fe274..8b1f4c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -346,9 +346,7 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query) { - hiveResultString(Dataset.ofRows(query.sparkSession, query.logical)) - } + SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan)) } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org