This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a97d3b9 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command a97d3b9 is described below commit a97d3b9f4f4ddd215ecaa7f96c64aeba6e825f74 Author: Terry Kim <yumin...@gmail.com> AuthorDate: Fri Mar 27 12:48:14 2020 +0800 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command ### What changes were proposed in this pull request? `HiveResult` performs some conversions for commands to be compatible with Hive output, e.g.: ``` // If it is a describe command for a Hive table, we want to have the output format be similar with Hive. case ExecutedCommandExec(_: DescribeCommandBase) => ... // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. case command ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => ``` This conversion is needed for DatasourceV2 commands as well and this PR proposes to add the conversion for v2 commands `SHOW TABLES` and `DESCRIBE TABLE`. ### Why are the changes needed? This is a bug where conversion is not applied to v2 commands. ### Does this PR introduce any user-facing change? Yes, now the outputs for v2 commands `SHOW TABLES` and `DESCRIBE TABLE` are compatible with HIVE output. For example, with a table created as: ``` CREATE TABLE testcat.ns.tbl (id bigint COMMENT 'col1') USING foo ``` The output of `SHOW TABLES` has changed from ``` ns table ``` to ``` table ``` And the output of `DESCRIBE TABLE` has changed from ``` id bigint col1 # Partitioning Not partitioned ``` to ``` id bigint col1 # Partitioning Not partitioned ``` ### How was this patch tested? Added unit tests. Closes #28004 from imback82/hive_result. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/HiveResult.scala | 29 +++++++++++++------- .../spark/sql/execution/HiveResultSuite.scala | 32 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index ff820bf..21874bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -24,6 +24,7 @@ import java.time.{Instant, LocalDate} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, ShowTablesExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -38,18 +39,17 @@ object HiveResult { */ def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { case ExecutedCommandExec(_: DescribeCommandBase) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - executedPlan.executeCollectPublic().map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. + formatDescribeTableOutput(executedPlan.executeCollectPublic()) + case _: DescribeTableExec => + formatDescribeTableOutput(executedPlan.executeCollectPublic()) + // SHOW TABLES in Hive only output table names while our v1 command outputs + // database, table name, isTemp. case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) + // SHOW TABLES in Hive only output table names while our v2 command outputs + // namespace and table name. + case command : ShowTablesExec => + command.executeCollect().map(_.getString(1)) case other => val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq // We need the types so we can output struct field names @@ -59,6 +59,15 @@ object HiveResult { .map(_.mkString("\t")) } + private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = { + rows.map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") + } + } + private def zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) private def dateFormatter = DateFormatter(zoneId) private def timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index bf7cbaa..5e81c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession} class HiveResultSuite extends SharedSparkSession { @@ -68,4 +69,35 @@ class HiveResultSuite extends SharedSparkSession { val result = HiveResult.hiveResultString(executedPlan) assert(result.head === "0.00000000") } + + test("SHOW TABLES in hive result") { + withSQLConf("spark.sql.catalog.testcat" -> classOf[InMemoryTableCatalog].getName) { + Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl", "csv")).foreach { + case (ns, tbl, source) => + withTable(s"$ns.$tbl") { + spark.sql(s"CREATE TABLE $ns.$tbl (id bigint) USING $source") + val df = spark.sql(s"SHOW TABLES FROM $ns") + val executedPlan = df.queryExecution.executedPlan + assert(HiveResult.hiveResultString(executedPlan).head == tbl) + } + } + } + } + + test("DESCRIBE TABLE in hive result") { + withSQLConf("spark.sql.catalog.testcat" -> classOf[InMemoryTableCatalog].getName) { + Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl", "csv")).foreach { + case (ns, tbl, source) => + withTable(s"$ns.$tbl") { + spark.sql(s"CREATE TABLE $ns.$tbl (id bigint COMMENT 'col1') USING $source") + val df = spark.sql(s"DESCRIBE $ns.$tbl") + val executedPlan = df.queryExecution.executedPlan + val expected = "id " + + "\tbigint " + + "\tcol1 " + assert(HiveResult.hiveResultString(executedPlan).head == expected) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org