asfgit closed pull request #23409: [SPARK-26502][SQL] Move hiveResultString() 
from QueryExecution to HiveResult
URL: https://github.com/apache/spark/pull/23409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
new file mode 100644
index 0000000000000..22d3ca958a210
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec, ShowTablesCommand}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * Runs a query returning the result in Hive compatible form.
+ */
+object HiveResult {
+  /**
+   * Returns the result as a hive compatible sequence of strings. This is used 
in tests and
+   * `SparkSQLDriver` for CLI applications.
+   */
+  def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan 
match {
+    case ExecutedCommandExec(desc: DescribeTableCommand) =>
+      // If it is a describe command for a Hive table, we want to have the 
output format
+      // be similar with Hive.
+      executedPlan.executeCollectPublic().map {
+        case Row(name: String, dataType: String, comment) =>
+          Seq(name, dataType,
+            Option(comment.asInstanceOf[String]).getOrElse(""))
+            .map(s => String.format(s"%-20s", s))
+            .mkString("\t")
+      }
+    // SHOW TABLES in Hive only output table names, while ours output 
database, table name, isTemp.
+    case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended 
=>
+      command.executeCollect().map(_.getString(1))
+    case other =>
+      val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq
+      // We need the types so we can output struct field names
+      val types = executedPlan.output.map(_.dataType)
+      // Reformat to match hive tab delimited output.
+      result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
+  }
+
+  /** Formats a datum (based on the given data type) and returns the string 
representation. */
+  private def toHiveString(a: (Any, DataType)): String = {
+    val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, 
FloatType,
+      BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)
+    val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
+
+    def formatDecimal(d: java.math.BigDecimal): String = {
+      if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {
+        java.math.BigDecimal.ZERO.toPlainString
+      } else {
+        d.stripTrailingZeros().toPlainString
+      }
+    }
+
+    /** Hive outputs fields of structs slightly differently than top level 
attributes. */
+    def toHiveStructString(a: (Any, DataType)): String = a match {
+      case (struct: Row, StructType(fields)) =>
+        struct.toSeq.zip(fields).map {
+          case (v, t) => s""""${t.name}":${toHiveStructString((v, 
t.dataType))}"""
+        }.mkString("{", ",", "}")
+      case (seq: Seq[_], ArrayType(typ, _)) =>
+        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+      case (map: Map[_, _], MapType(kType, vType, _)) =>
+        map.map {
+          case (key, value) =>
+            toHiveStructString((key, kType)) + ":" + 
toHiveStructString((value, vType))
+        }.toSeq.sorted.mkString("{", ",", "}")
+      case (null, _) => "null"
+      case (s: String, StringType) => "\"" + s + "\""
+      case (decimal, DecimalType()) => decimal.toString
+      case (interval, CalendarIntervalType) => interval.toString
+      case (other, tpe) if primitiveTypes contains tpe => other.toString
+    }
+
+    a match {
+      case (struct: Row, StructType(fields)) =>
+        struct.toSeq.zip(fields).map {
+          case (v, t) => s""""${t.name}":${toHiveStructString((v, 
t.dataType))}"""
+        }.mkString("{", ",", "}")
+      case (seq: Seq[_], ArrayType(typ, _)) =>
+        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+      case (map: Map[_, _], MapType(kType, vType, _)) =>
+        map.map {
+          case (key, value) =>
+            toHiveStructString((key, kType)) + ":" + 
toHiveStructString((value, vType))
+        }.toSeq.sorted.mkString("{", ",", "}")
+      case (null, _) => "NULL"
+      case (d: Date, DateType) =>
+        DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
+      case (t: Timestamp, TimestampType) =>
+        DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), 
timeZone)
+      case (bin: Array[Byte], BinaryType) => new String(bin, 
StandardCharsets.UTF_8)
+      case (decimal: java.math.BigDecimal, DecimalType()) => 
formatDecimal(decimal)
+      case (interval, CalendarIntervalType) => interval.toString
+      case (other, tpe) if primitiveTypes.contains(tpe) => other.toString
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 7fccbf65d8525..72499aa936a56 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -18,25 +18,20 @@
 package org.apache.spark.sql.execution
 
 import java.io.{BufferedWriter, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.sql.{Date, Timestamp}
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReuseExchange}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, 
TimestampType, _}
 import org.apache.spark.util.Utils
 
 /**
@@ -109,90 +104,6 @@ class QueryExecution(
     ReuseExchange(sparkSession.sessionState.conf),
     ReuseSubquery(sparkSession.sessionState.conf))
 
-  /**
-   * Returns the result as a hive compatible sequence of strings. This is used 
in tests and
-   * `SparkSQLDriver` for CLI applications.
-   */
-  def hiveResultString(): Seq[String] = executedPlan match {
-    case ExecutedCommandExec(desc: DescribeTableCommand) =>
-      // If it is a describe command for a Hive table, we want to have the 
output format
-      // be similar with Hive.
-      desc.run(sparkSession).map {
-        case Row(name: String, dataType: String, comment) =>
-          Seq(name, dataType,
-            Option(comment.asInstanceOf[String]).getOrElse(""))
-            .map(s => String.format(s"%-20s", s))
-            .mkString("\t")
-      }
-    // SHOW TABLES in Hive only output table names, while ours output 
database, table name, isTemp.
-    case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended 
=>
-      command.executeCollect().map(_.getString(1))
-    case other =>
-      val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq
-      // We need the types so we can output struct field names
-      val types = analyzed.output.map(_.dataType)
-      // Reformat to match hive tab delimited output.
-      result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
-  }
-
-  /** Formats a datum (based on the given data type) and returns the string 
representation. */
-  private def toHiveString(a: (Any, DataType)): String = {
-    val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, 
FloatType,
-      BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)
-
-    def formatDecimal(d: java.math.BigDecimal): String = {
-      if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {
-        java.math.BigDecimal.ZERO.toPlainString
-      } else {
-        d.stripTrailingZeros().toPlainString
-      }
-    }
-
-    /** Hive outputs fields of structs slightly differently than top level 
attributes. */
-    def toHiveStructString(a: (Any, DataType)): String = a match {
-      case (struct: Row, StructType(fields)) =>
-        struct.toSeq.zip(fields).map {
-          case (v, t) => s""""${t.name}":${toHiveStructString((v, 
t.dataType))}"""
-        }.mkString("{", ",", "}")
-      case (seq: Seq[_], ArrayType(typ, _)) =>
-        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
-      case (map: Map[_, _], MapType(kType, vType, _)) =>
-        map.map {
-          case (key, value) =>
-            toHiveStructString((key, kType)) + ":" + 
toHiveStructString((value, vType))
-        }.toSeq.sorted.mkString("{", ",", "}")
-      case (null, _) => "null"
-      case (s: String, StringType) => "\"" + s + "\""
-      case (decimal, DecimalType()) => decimal.toString
-      case (interval, CalendarIntervalType) => interval.toString
-      case (other, tpe) if primitiveTypes contains tpe => other.toString
-    }
-
-    a match {
-      case (struct: Row, StructType(fields)) =>
-        struct.toSeq.zip(fields).map {
-          case (v, t) => s""""${t.name}":${toHiveStructString((v, 
t.dataType))}"""
-        }.mkString("{", ",", "}")
-      case (seq: Seq[_], ArrayType(typ, _)) =>
-        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
-      case (map: Map[_, _], MapType(kType, vType, _)) =>
-        map.map {
-          case (key, value) =>
-            toHiveStructString((key, kType)) + ":" + 
toHiveStructString((value, vType))
-        }.toSeq.sorted.mkString("{", ",", "}")
-      case (null, _) => "NULL"
-      case (d: Date, DateType) =>
-        DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
-      case (t: Timestamp, TimestampType) =>
-        DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
-          
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
-      case (bin: Array[Byte], BinaryType) => new String(bin, 
StandardCharsets.UTF_8)
-      case (decimal: java.math.BigDecimal, DecimalType()) => 
formatDecimal(decimal)
-      case (interval, CalendarIntervalType) => interval.toString
-      case (other, tpe) if primitiveTypes.contains(tpe) => other.toString
-    }
-  }
-
   def simpleString: String = withRedaction {
     val concat = new StringConcat()
     concat.append("== Physical Plan ==\n")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index b2515226d9a14..24b312348bd67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -22,11 +22,11 @@ import java.util.{Locale, TimeZone}
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
+import org.apache.spark.sql.execution.HiveResult.hiveResultString
 import org.apache.spark.sql.execution.command.{DescribeColumnCommand, 
DescribeTableCommand}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -287,7 +287,8 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
       val schema = df.schema
       val notIncludedMsg = "[not included in comparison]"
       // Get answer, but also get rid of the #1234 expression ids that show up 
in explain plans
-      val answer = 
df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x")
+      val answer = hiveResultString(df.queryExecution.executedPlan)
+        .map(_.replaceAll("#\\d+", "#x")
         .replaceAll("Location.*/sql/core/", s"Location 
${notIncludedMsg}sql/core/")
         .replaceAll("Created By.*", s"Created By $notIncludedMsg")
         .replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 6775902173444..960fdd11db15d 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.execution.HiveResult.hiveResultString
 
 
 private[hive] class SparkSQLDriver(val context: SQLContext = 
SparkSQLEnv.sqlContext)
@@ -61,7 +62,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = 
SparkSQLEnv.sqlCont
       context.sparkContext.setJobDescription(command)
       val execution = 
context.sessionState.executePlan(context.sql(command).logicalPlan)
       hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, 
execution) {
-        execution.hiveResultString()
+        hiveResultString(execution.executedPlan)
       }
       tableSchema = getResultSetSchema(execution)
       new CommandProcessorResponse(0)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 3508affda241a..4c2bc62b9faf8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -297,7 +297,7 @@ private[hive] class TestHiveSparkSession(
 
   protected[hive] implicit class SqlCmd(sql: String) {
     def cmd: () => Unit = {
-      () => new TestHiveQueryExecution(sql).hiveResultString(): Unit
+      () => new TestHiveQueryExecution(sql).executedPlan.executeCollect(): Unit
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 272e6f51f5002..66426824573c6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.HiveResult.hiveResultString
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
@@ -345,7 +346,8 @@ abstract class HiveComparisonTest
         val catalystResults = queryList.zip(hiveResults).map { case 
(queryString, hive) =>
           val query = new 
TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
           def getResult(): Seq[String] = {
-            SQLExecution.withNewExecutionId(query.sparkSession, 
query)(query.hiveResultString())
+            SQLExecution.withNewExecutionId(
+              query.sparkSession, query)(hiveResultString(query.executedPlan))
           }
           try { (query, prepareAnswer(query, getResult())) } catch {
             case e: Throwable =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to