This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new dde7e45  [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 
command
dde7e45 is described below

commit dde7e457e8aed561dfdc5309952bbfc99ddfc1a6
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Fri Mar 27 12:48:14 2020 +0800

    [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command
    
    ### What changes were proposed in this pull request?
    
    `HiveResult` performs some conversions for commands to be compatible with 
Hive output, e.g.:
    ```
    // If it is a describe command for a Hive table, we want to have the output 
format be similar with Hive.
    case ExecutedCommandExec(_: DescribeCommandBase) =>
    ...
    // SHOW TABLES in Hive only output table names, while ours output database, 
table name, isTemp.
    case command  ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
    ```
    This conversion is needed for DatasourceV2 commands as well and this PR 
proposes to add the conversion for v2 commands `SHOW TABLES` and `DESCRIBE 
TABLE`.
    
    ### Why are the changes needed?
    
    This is a bug where conversion is not applied to v2 commands.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, now the outputs for v2 commands `SHOW TABLES` and `DESCRIBE TABLE` are 
compatible with HIVE output.
    
    For example, with a table created as:
    ```
    CREATE TABLE testcat.ns.tbl (id bigint COMMENT 'col1') USING foo
    ```
    
    The output of `SHOW TABLES` has changed from
    ```
    ns    table
    ```
    to
    ```
    table
    ```
    
    And the output of `DESCRIBE TABLE` has changed from
    ```
    id    bigint    col1
    
    # Partitioning
    Not partitioned
    ```
    to
    ```
    id                      bigint                  col1
    
    # Partitioning
    Not partitioned
    ```
    
    ### How was this patch tested?
    
    Added unit tests.
    
    Closes #28004 from imback82/hive_result.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit a97d3b9f4f4ddd215ecaa7f96c64aeba6e825f74)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/HiveResult.scala    | 29 +++++++++++++-------
 .../spark/sql/execution/HiveResultSuite.scala      | 32 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index ff820bf..21874bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -24,6 +24,7 @@ import java.time.{Instant, LocalDate}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
 import org.apache.spark.sql.execution.command.{DescribeCommandBase, 
ExecutedCommandExec, ShowTablesCommand}
+import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, 
ShowTablesExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -38,18 +39,17 @@ object HiveResult {
    */
   def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan 
match {
     case ExecutedCommandExec(_: DescribeCommandBase) =>
-      // If it is a describe command for a Hive table, we want to have the 
output format
-      // be similar with Hive.
-      executedPlan.executeCollectPublic().map {
-        case Row(name: String, dataType: String, comment) =>
-          Seq(name, dataType,
-            Option(comment.asInstanceOf[String]).getOrElse(""))
-            .map(s => String.format(s"%-20s", s))
-            .mkString("\t")
-      }
-    // SHOW TABLES in Hive only output table names, while ours output 
database, table name, isTemp.
+      formatDescribeTableOutput(executedPlan.executeCollectPublic())
+    case _: DescribeTableExec =>
+      formatDescribeTableOutput(executedPlan.executeCollectPublic())
+    // SHOW TABLES in Hive only output table names while our v1 command outputs
+    // database, table name, isTemp.
     case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended 
=>
       command.executeCollect().map(_.getString(1))
+    // SHOW TABLES in Hive only output table names while our v2 command outputs
+    // namespace and table name.
+    case command : ShowTablesExec =>
+      command.executeCollect().map(_.getString(1))
     case other =>
       val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq
       // We need the types so we can output struct field names
@@ -59,6 +59,15 @@ object HiveResult {
         .map(_.mkString("\t"))
   }
 
+  private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = {
+    rows.map {
+      case Row(name: String, dataType: String, comment) =>
+        Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse(""))
+          .map(s => String.format(s"%-20s", s))
+          .mkString("\t")
+    }
+  }
+
   private def zoneId = 
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
   private def dateFormatter = DateFormatter(zoneId)
   private def timestampFormatter = 
TimestampFormatter.getFractionFormatter(zoneId)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bf7cbaa..5e81c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.sql.connector.InMemoryTableCatalog
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, 
SharedSparkSession}
 
 class HiveResultSuite extends SharedSparkSession {
@@ -68,4 +69,35 @@ class HiveResultSuite extends SharedSparkSession {
     val result = HiveResult.hiveResultString(executedPlan)
     assert(result.head === "0.00000000")
   }
+
+  test("SHOW TABLES in hive result") {
+    withSQLConf("spark.sql.catalog.testcat" -> 
classOf[InMemoryTableCatalog].getName) {
+      Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl", 
"csv")).foreach {
+        case (ns, tbl, source) =>
+          withTable(s"$ns.$tbl") {
+            spark.sql(s"CREATE TABLE $ns.$tbl (id bigint) USING $source")
+            val df = spark.sql(s"SHOW TABLES FROM $ns")
+            val executedPlan = df.queryExecution.executedPlan
+            assert(HiveResult.hiveResultString(executedPlan).head == tbl)
+          }
+      }
+    }
+  }
+
+  test("DESCRIBE TABLE in hive result") {
+    withSQLConf("spark.sql.catalog.testcat" -> 
classOf[InMemoryTableCatalog].getName) {
+      Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl", 
"csv")).foreach {
+        case (ns, tbl, source) =>
+          withTable(s"$ns.$tbl") {
+            spark.sql(s"CREATE TABLE $ns.$tbl (id bigint COMMENT 'col1') USING 
$source")
+            val df = spark.sql(s"DESCRIBE $ns.$tbl")
+            val executedPlan = df.queryExecution.executedPlan
+            val expected = "id                  " +
+              "\tbigint              " +
+              "\tcol1                "
+            assert(HiveResult.hiveResultString(executedPlan).head == expected)
+          }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to