Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d30238f1b -> 43084b3cc


[SPARK-19459][SQL][BRANCH-2.1] Support for nested char/varchar fields in ORC

## What changes were proposed in this pull request?
This is a backport of the two following commits: 
https://github.com/apache/spark/commit/78eae7e67fd5dec0c2d5b18000053ce86cd0f1ae 
& 
https://github.com/apache/spark/commit/de8a03e68202647555e30fffba551f65bc77608d

This PR adds support for ORC tables with (nested) char/varchar fields.

## How was this patch tested?
Added a regression test to `OrcSourceSuite`.

Author: Herman van Hovell <hvanhov...@databricks.com>

Closes #17041 from hvanhovell/SPARK-19459-branch-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43084b3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43084b3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43084b3c

Branch: refs/heads/branch-2.1
Commit: 43084b3cc3918b720fe28053d2037fa22a71264e
Parents: d30238f
Author: Herman van Hovell <hvanhov...@databricks.com>
Authored: Thu Feb 23 14:58:02 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Feb 23 14:58:02 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 40 ++++++++---
 .../apache/spark/sql/types/HiveStringType.scala | 73 ++++++++++++++++++++
 .../org/apache/spark/sql/types/package.scala    | 10 ++-
 .../spark/sql/sources/TableScanSuite.scala      |  7 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala   |  8 ---
 .../spark/sql/hive/MetastoreRelation.scala      |  7 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  8 +--
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 39 +++++++++--
 8 files changed, 161 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 06f0f5b..a3b39a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -76,7 +76,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
   }
 
   override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = 
withOrigin(ctx) {
-    visit(ctx.dataType).asInstanceOf[DataType]
+    visitSparkDataType(ctx.dataType)
   }
 
   /* 
********************************************************************************************
@@ -997,7 +997,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
    * Create a [[Cast]] expression.
    */
   override def visitCast(ctx: CastContext): Expression = withOrigin(ctx) {
-    Cast(expression(ctx.expression), typedVisit(ctx.dataType))
+    Cast(expression(ctx.expression), visitSparkDataType(ctx.dataType))
   }
 
   /**
@@ -1416,6 +1416,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
    * DataType parsing
    * 
********************************************************************************************
 */
   /**
+   * Create a Spark DataType.
+   */
+  private def visitSparkDataType(ctx: DataTypeContext): DataType = {
+    HiveStringType.replaceCharType(typedVisit(ctx))
+  }
+
+  /**
    * Resolve/create a primitive type.
    */
   override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType 
= withOrigin(ctx) {
@@ -1429,8 +1436,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
       case ("double", Nil) => DoubleType
       case ("date", Nil) => DateType
       case ("timestamp", Nil) => TimestampType
-      case ("char" | "varchar" | "string", Nil) => StringType
-      case ("char" | "varchar", _ :: Nil) => StringType
+      case ("string", Nil) => StringType
+      case ("char", length :: Nil) => CharType(length.getText.toInt)
+      case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)
       case ("binary", Nil) => BinaryType
       case ("decimal", Nil) => DecimalType.USER_DEFAULT
       case ("decimal", precision :: Nil) => 
DecimalType(precision.getText.toInt, 0)
@@ -1452,7 +1460,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
       case SqlBaseParser.MAP =>
         MapType(typedVisit(ctx.dataType(0)), typedVisit(ctx.dataType(1)))
       case SqlBaseParser.STRUCT =>
-        createStructType(ctx.complexColTypeList())
+        
StructType(Option(ctx.complexColTypeList).toSeq.flatMap(visitComplexColTypeList))
     }
   }
 
@@ -1471,12 +1479,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with Logging {
   }
 
   /**
-   * Create a [[StructField]] from a column definition.
+   * Create a top level [[StructField]] from a column definition.
    */
   override def visitColType(ctx: ColTypeContext): StructField = 
withOrigin(ctx) {
     import ctx._
-    val structField = StructField(identifier.getText, typedVisit(dataType), 
nullable = true)
-    if (STRING == null) structField else 
structField.withComment(string(STRING))
+
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    if (STRING != null) {
+      builder.putString("comment", string(STRING))
+    }
+    // Add Hive type string to metadata.
+    val rawDataType = typedVisit[DataType](ctx.dataType)
+    val cleanedDataType = HiveStringType.replaceCharType(rawDataType)
+    if (rawDataType != cleanedDataType) {
+      builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString)
+    }
+
+    StructField(
+      identifier.getText,
+      cleanedDataType,
+      nullable = true,
+      builder.build())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala
new file mode 100644
index 0000000..b319eb7
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A hive string type for compatibility. These datatypes should only used for 
parsing,
+ * and should NOT be used anywhere else. Any instance of these data types 
should be
+ * replaced by a [[StringType]] before analysis.
+ */
+sealed abstract class HiveStringType extends AtomicType {
+  private[sql] type InternalType = UTF8String
+
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized {
+    typeTag[InternalType]
+  }
+
+  override def defaultSize: Int = length
+
+  private[spark] override def asNullable: HiveStringType = this
+
+  def length: Int
+}
+
+object HiveStringType {
+  def replaceCharType(dt: DataType): DataType = dt match {
+    case ArrayType(et, nullable) =>
+      ArrayType(replaceCharType(et), nullable)
+    case MapType(kt, vt, nullable) =>
+      MapType(replaceCharType(kt), replaceCharType(vt), nullable)
+    case StructType(fields) =>
+      StructType(fields.map { field =>
+        field.copy(dataType = replaceCharType(field.dataType))
+      })
+    case _: HiveStringType => StringType
+    case _ => dt
+  }
+}
+
+/**
+ * Hive char type.
+ */
+case class CharType(length: Int) extends HiveStringType {
+  override def simpleString: String = s"char($length)"
+}
+
+/**
+ * Hive varchar type.
+ */
+case class VarcharType(length: Int) extends HiveStringType {
+  override def simpleString: String = s"varchar($length)"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
index 346a51e..f29cbc2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
@@ -21,4 +21,12 @@ package org.apache.spark.sql
  * Contains a type system for attributes produced by relations, including 
complex types like
  * structs, arrays and maps.
  */
-package object types
+package object types {
+  /**
+   * Metadata key used to store the raw hive type string in the metadata of 
StructField. This
+   * is relevant for datatypes that do not have a direct Spark SQL 
counterpart, such as CHAR and
+   * VARCHAR. We need to preserve the original type in order to invoke the 
correct object
+   * inspector in Hive.
+   */
+  val HIVE_TYPE_STRING = "HIVE_TYPE_STRING"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 86bcb4d..eaa5fb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -203,6 +203,9 @@ class TableScanSuite extends DataSourceTest with 
SharedSQLContext {
     (2 to 10).map(i => Row(i, i - 1)).toSeq)
 
   test("Schema and all fields") {
+    def hiveMetadata(dt: String): Metadata = {
+      new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build()
+    }
     val expectedSchema = StructType(
       StructField("string$%Field", StringType, true) ::
       StructField("binaryField", BinaryType, true) ::
@@ -217,8 +220,8 @@ class TableScanSuite extends DataSourceTest with 
SharedSQLContext {
       StructField("decimalField2", DecimalType(9, 2), true) ::
       StructField("dateField", DateType, true) ::
       StructField("timestampField", TimestampType, true) ::
-      StructField("varcharField", StringType, true) ::
-      StructField("charField", StringType, true) ::
+      StructField("varcharField", StringType, true, 
hiveMetadata("varchar(12)")) ::
+      StructField("charField", StringType, true, hiveMetadata("char(18)")) ::
       StructField("arrayFieldSimple", ArrayType(IntegerType), true) ::
       StructField("arrayFieldComplex",
         ArrayType(

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 26b1994..81cd65c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -54,14 +54,6 @@ private[spark] object HiveUtils extends Logging {
   /** The version of hive used internally by Spark SQL. */
   val hiveExecutionVersion: String = "1.2.1"
 
-  /**
-   * The property key that is used to store the raw hive type string in the 
metadata of StructField.
-   * For example, in the case where the Hive type is varchar, the type gets 
mapped to a string type
-   * in Spark SQL, but we need to preserve the original type in order to 
invoke the correct object
-   * inspector in Hive.
-   */
-  val hiveTypeString: String = "HIVE_TYPE_STRING"
-
   val HIVE_METASTORE_VERSION = 
SQLConfigBuilder("spark.sql.hive.metastore.version")
     .doc("Version of the Hive metastore. Available options are " +
         s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 3bbac05..8f40a59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -35,8 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types._
 
 
 private[hive] case class MetastoreRelation(
@@ -61,8 +60,8 @@ private[hive] case class MetastoreRelation(
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: 
sparkSession :: Nil
 
   private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
+    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+      c.metadata.getString(HIVE_TYPE_STRING)
     } else {
       c.dataType.catalogString
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a9ca1a4..9b3f299 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
@@ -777,8 +777,8 @@ private[hive] class HiveClientImpl(
       .asInstanceOf[Class[_ <: 
org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
   private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
+    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+      c.metadata.getString(HIVE_TYPE_STRING)
     } else {
       c.dataType.catalogString
     }
@@ -793,7 +793,7 @@ private[hive] class HiveClientImpl(
         throw new SparkException("Cannot recognize hive type string: " + 
hc.getType, e)
     }
 
-    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, 
hc.getType).build()
+    val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, 
hc.getType).build()
     val field = StructField(
       name = hc.getName,
       dataType = columnType,

http://git-wip-us.apache.org/repos/asf/spark/blob/43084b3c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 2b40469..aa60a3f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -154,12 +154,43 @@ abstract class OrcSuite extends QueryTest with 
TestHiveSingleton with BeforeAndA
 
   test("SPARK-18220: read Hive orc table with varchar column") {
     val hiveClient = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+    val location = Utils.createTempDir()
+    val uri = location.toURI
     try {
-      hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS 
orc")
-      hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM 
(SELECT 1) t")
-      checkAnswer(spark.table("orc_varchar"), Row("a"))
+      hiveClient.runSqlHive(
+        """
+           |CREATE EXTERNAL TABLE hive_orc(
+           |  a STRING,
+           |  b CHAR(10),
+           |  c VARCHAR(10),
+           |  d ARRAY<CHAR(3)>)
+           |STORED AS orc""".stripMargin)
+      // Hive throws an exception if I assign the location in the create table 
statement.
+      hiveClient.runSqlHive(
+        s"ALTER TABLE hive_orc SET LOCATION '$uri'")
+      hiveClient.runSqlHive(
+        """INSERT INTO TABLE hive_orc
+          |SELECT 'a', 'b', 'c', ARRAY(CAST('d' AS CHAR(3)))
+          |FROM (SELECT 1) t""".stripMargin)
+
+      // We create a different table in Spark using the same schema which 
points to
+      // the same location.
+      spark.sql(
+        s"""
+           |CREATE EXTERNAL TABLE spark_orc(
+           |  a STRING,
+           |  b CHAR(10),
+           |  c VARCHAR(10),
+           |  d ARRAY<CHAR(3)>)
+           |STORED AS orc
+           |LOCATION '$uri'""".stripMargin)
+      val result = Row("a", "b         ", "c", Seq("d  "))
+      checkAnswer(spark.table("hive_orc"), result)
+      checkAnswer(spark.table("spark_orc"), result)
     } finally {
-      hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
+      hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
+      hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
+      Utils.deleteRecursively(location)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to