Repository: spark
Updated Branches:
  refs/heads/master 064d91ff7 -> 301fb0d72


[SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn

## What changes were proposed in this pull request?

`StructField` has very similar semantic with `CatalogColumn`, except that 
`CatalogColumn` use string to express data type. I think it's reasonable to use 
`StructType` as the `CatalogTable.schema` and remove `CatalogColumn`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #14363 from cloud-fan/column.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/301fb0d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/301fb0d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/301fb0d7

Branch: refs/heads/master
Commit: 301fb0d7236eb55d53c9cd60804a2d755b4ad3b2
Parents: 064d91f
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Sun Jul 31 18:18:53 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Jul 31 18:18:53 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  9 +---
 .../spark/sql/catalyst/catalog/interface.scala  | 50 +++++---------------
 .../catalyst/catalog/ExternalCatalogSuite.scala | 25 +++++-----
 .../spark/sql/execution/SparkSqlParser.scala    | 27 ++---------
 .../command/createDataSourceTables.scala        |  6 +--
 .../spark/sql/execution/command/ddl.scala       |  2 +-
 .../spark/sql/execution/command/tables.scala    | 24 ++++------
 .../spark/sql/execution/command/views.scala     | 31 ++++++------
 .../apache/spark/sql/internal/CatalogImpl.scala |  4 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 25 +++++-----
 .../spark/sql/hive/MetastoreRelation.scala      | 12 ++---
 .../spark/sql/hive/client/HiveClientImpl.scala  | 24 ++++++----
 .../CreateHiveTableAsSelectCommand.scala        |  6 +--
 .../spark/sql/hive/HiveDDLCommandSuite.scala    | 29 ++++++------
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  8 ++--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  7 ++-
 .../spark/sql/hive/client/VersionsSuite.scala   |  6 +--
 17 files changed, 120 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e36241a..980efda 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,14 +259,7 @@ class SessionCatalog(
         identifier = tid,
         tableType = CatalogTableType.VIEW,
         storage = CatalogStorageFormat.empty,
-        schema = tempTables(table).output.map { c =>
-          CatalogColumn(
-            name = c.name,
-            dataType = c.dataType.catalogString,
-            nullable = c.nullable,
-            comment = Option(c.name)
-          )
-        },
+        schema = tempTables(table).output.toStructType,
         properties = Map(),
         viewText = None)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 710bce5..38f0bc2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.catalyst.catalog
 
 import java.util.Date
-import javax.annotation.Nullable
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -26,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -78,28 +78,6 @@ object CatalogStorageFormat {
 }
 
 /**
- * A column in a table.
- */
-case class CatalogColumn(
-    name: String,
-    // TODO: make this type-safe; this is left as a string due to issues in 
converting Hive
-    // varchars to and from SparkSQL strings.
-    dataType: String,
-    nullable: Boolean = true,
-    comment: Option[String] = None) {
-
-  override def toString: String = {
-    val output =
-      Seq(s"`$name`",
-        dataType,
-        if (!nullable) "NOT NULL" else "",
-        comment.map("(" + _ + ")").getOrElse(""))
-    output.filter(_.nonEmpty).mkString(" ")
-  }
-
-}
-
-/**
  * A partition (Hive style) defined in the catalog.
  *
  * @param spec partition spec values indexed by column name
@@ -141,7 +119,7 @@ case class CatalogTable(
     identifier: TableIdentifier,
     tableType: CatalogTableType,
     storage: CatalogStorageFormat,
-    schema: Seq[CatalogColumn],
+    schema: StructType,
     partitionColumnNames: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
     owner: String = "",
@@ -163,9 +141,10 @@ case class CatalogTable(
   requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), 
"sort")
   requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), 
"bucket")
 
-  /** Columns this table is partitioned by. */
-  def partitionColumns: Seq[CatalogColumn] =
-    schema.filter { c => partitionColumnNames.contains(c.name) }
+  /** schema of this table's partition columns */
+  def partitionSchema: StructType = StructType(schema.filter {
+    c => partitionColumnNames.contains(c.name)
+  })
 
   /** Return the database this table was specified to belong to, assuming it 
exists. */
   def database: String = identifier.database.getOrElse {
@@ -277,16 +256,13 @@ case class SimpleCatalogRelation(
   override lazy val resolved: Boolean = false
 
   override val output: Seq[Attribute] = {
-    val cols = catalogTable.schema
-      .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
-    (cols ++ catalogTable.partitionColumns).map { f =>
-      AttributeReference(
-        f.name,
-        CatalystSqlParser.parseDataType(f.dataType),
-        // Since data can be dumped in randomly with no validation, everything 
is nullable.
-        nullable = true
-      )(qualifier = Some(metadata.identifier.table))
-    }
+    val (partCols, dataCols) = metadata.schema.toAttributes
+      // Since data can be dumped in randomly with no validation, everything 
is nullable.
+      
.map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
+      .partition { a =>
+        metadata.partitionColumnNames.contains(a.name)
+      }
+    dataCols ++ partCols
   }
 
   require(

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 3a0dcea..963a225 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 
@@ -551,7 +552,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       identifier = TableIdentifier("my_table", Some("db1")),
       tableType = CatalogTableType.MANAGED,
       storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
-      schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+      schema = new StructType().add("a", "int").add("b", "string")
     )
 
     catalog.createTable("db1", table, ignoreIfExists = false)
@@ -570,7 +571,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       storage = CatalogStorageFormat(
         Some(Utils.createTempDir().getAbsolutePath),
         None, None, None, false, Map.empty),
-      schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+      schema = new StructType().add("a", "int").add("b", "string")
     )
     catalog.createTable("db1", externalTable, ignoreIfExists = false)
     assert(!exists(db.locationUri, "external_table"))
@@ -583,11 +584,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       identifier = TableIdentifier("tbl", Some("db1")),
       tableType = CatalogTableType.MANAGED,
       storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
-      schema = Seq(
-        CatalogColumn("col1", "int"),
-        CatalogColumn("col2", "string"),
-        CatalogColumn("a", "int"),
-        CatalogColumn("b", "string")),
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "string"),
       partitionColumnNames = Seq("a", "b")
     )
     catalog.createTable("db1", table, ignoreIfExists = false)
@@ -686,11 +687,11 @@ abstract class CatalogTestUtils {
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat,
-      schema = Seq(
-        CatalogColumn("col1", "int"),
-        CatalogColumn("col2", "string"),
-        CatalogColumn("a", "int"),
-        CatalogColumn("b", "string")),
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "string"),
       partitionColumnNames = Seq("a", "b"),
       bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5e1ad9b..22b1e07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation,
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StructType}
 
 /**
  * Concrete parser for Spark SQL statements.
@@ -928,13 +928,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
       operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
     }
     val comment = Option(ctx.STRING).map(string)
-    val partitionCols = 
Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
-    val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+    val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
+    val partitionCols = 
Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
     val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val selectQuery = Option(ctx.query).map(plan)
 
     // Ensuring whether no duplicate name is used in table definition
-    val colNames = cols.map(_.name)
+    val colNames = dataCols.map(_.name)
     if (colNames.length != colNames.distinct.length) {
       val duplicateColumns = colNames.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
@@ -952,7 +952,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     // Note: Hive requires partition columns to be distinct from the schema, 
so we need
     // to include the partition columns here explicitly
-    val schema = cols ++ partitionCols
+    val schema = StructType(dataCols ++ partitionCols)
 
     // Storage format
     val defaultStorage: CatalogStorageFormat = {
@@ -1297,23 +1297,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create a sequence of [[CatalogColumn]]s from a column list
-   */
-  private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] 
= withOrigin(ctx) {
-    ctx.colType.asScala.map { col =>
-      CatalogColumn(
-        col.identifier.getText.toLowerCase,
-        // Note: for types like "STRUCT<myFirstName: STRING, myLastName: 
STRING>" we can't
-        // just convert the whole type string to lower case, otherwise the 
struct field names
-        // will no longer be case sensitive. Instead, we rely on our parser to 
get the proper
-        // case before passing it to Hive.
-        typedVisit[DataType](col.dataType).catalogString,
-        nullable = true,
-        Option(col.STRING).map(string))
-    }
-  }
-
-  /**
    * Create a [[ScriptInputOutputSchema]].
    */
   override protected def withScriptIOSchema(

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index fa3967c..93eb386 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -395,7 +395,7 @@ object CreateDataSourceTableUtils extends Logging {
       CatalogTable(
         identifier = tableIdent,
         tableType = tableType,
-        schema = Nil,
+        schema = new StructType,
         storage = CatalogStorageFormat(
           locationUri = None,
           inputFormat = None,
@@ -424,9 +424,7 @@ object CreateDataSourceTableUtils extends Logging {
           compressed = false,
           properties = options
         ),
-        schema = relation.schema.map { f =>
-          CatalogColumn(f.name, f.dataType.catalogString)
-        },
+        schema = relation.schema,
         properties = tableProperties.toMap,
         viewText = None)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7e99593..f0e49e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -518,7 +518,7 @@ object DDLUtils {
   }
 
   def isTablePartitioned(table: CatalogTable): Boolean = {
-    table.partitionColumns.nonEmpty || 
table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
+    table.partitionColumnNames.nonEmpty || 
table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
   }
 
   // A persisted data source table always store its schema in the catalog.

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f85373c..e6fe9a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -439,10 +439,10 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
         describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), 
buffer)
       }
     } else {
-      if (table.partitionColumns.nonEmpty) {
+      if (table.partitionColumnNames.nonEmpty) {
         append(buffer, "# Partition Information", "", "")
         append(buffer, s"# ${output.head.name}", output(1).name, 
output(2).name)
-        describeSchema(table.partitionColumns, buffer)
+        describeSchema(table.partitionSchema, buffer)
       }
     }
   }
@@ -521,12 +521,6 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
     }
   }
 
-  private def describeSchema(schema: Seq[CatalogColumn], buffer: 
ArrayBuffer[Row]): Unit = {
-    schema.foreach { column =>
-      append(buffer, column.name, column.dataType.toLowerCase, 
column.comment.orNull)
-    }
-  }
-
   private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): 
Unit = {
     schema.foreach { column =>
       append(buffer, column.name, column.dataType.simpleString, 
column.getComment().orNull)
@@ -701,7 +695,7 @@ case class ShowPartitionsCommand(
      * thrown if the partitioning spec is invalid.
      */
     if (spec.isDefined) {
-      val badColumns = 
spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+      val badColumns = 
spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
       if (badColumns.nonEmpty) {
         val badCols = badColumns.mkString("[", ", ", "]")
         throw new AnalysisException(
@@ -799,14 +793,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
       .foreach(builder.append)
   }
 
-  private def columnToDDLFragment(column: CatalogColumn): String = {
-    val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT 
'" + _ + "'")
-    s"${quoteIdentifier(column.name)} 
${column.dataType}${comment.getOrElse("")}"
+  private def columnToDDLFragment(column: StructField): String = {
+    val comment = column.getComment().map(escapeSingleQuotedString).map(" 
COMMENT '" + _ + "'")
+    s"${quoteIdentifier(column.name)} 
${column.dataType.catalogString}${comment.getOrElse("")}"
   }
 
   private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
-    if (metadata.partitionColumns.nonEmpty) {
-      val partCols = metadata.partitionColumns.map(columnToDDLFragment)
+    if (metadata.partitionColumnNames.nonEmpty) {
+      val partCols = metadata.partitionSchema.map(columnToDDLFragment)
       builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 901a9b9..e397cfa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -21,10 +21,11 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -161,18 +162,17 @@ case class CreateViewCommand(
    * SQL based on the analyzed plan, and also creates the proper schema for 
the view.
    */
   private def prepareTable(sparkSession: SparkSession, analyzedPlan: 
LogicalPlan): CatalogTable = {
-    val viewSQL: String = {
-      val logicalPlan = if (userSpecifiedColumns.isEmpty) {
-        analyzedPlan
-      } else {
-        val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-          case (attr, (colName, _)) => Alias(attr, colName)()
-        }
-        sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
+    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
+      analyzedPlan
+    } else {
+      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+        case (attr, (colName, _)) => Alias(attr, colName)()
       }
-      new SQLBuilder(logicalPlan).toSQL
+      sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
     }
 
+    val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
+
     // Validate the view SQL - make sure we can parse it and analyze it.
     // If we cannot analyze the generated query, there is probably a bug in 
SQL generation.
     try {
@@ -184,14 +184,11 @@ case class CreateViewCommand(
     }
 
     val viewSchema = if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan.output.map { a =>
-        CatalogColumn(a.name, a.dataType.catalogString)
-      }
+      aliasedPlan.schema
     } else {
-      analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (a, (name, comment)) =>
-          CatalogColumn(name, a.dataType.catalogString, comment = comment)
-      }
+      StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
+        case (field, (_, comment)) => 
comment.map(field.withComment).getOrElse(field)
+      })
     }
 
     CatalogTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 5393b76..f8f7872 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -157,8 +157,8 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
     val columns = tableMetadata.schema.map { c =>
       new Column(
         name = c.name,
-        description = c.comment.orNull,
-        dataType = c.dataType,
+        description = c.getComment().orNull,
+        dataType = c.dataType.catalogString,
         nullable = c.nullable,
         isPartition = partitionColumnNames.contains(c.name),
         isBucket = bucketColumnNames.contains(c.name))

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7bd1b0b..564fc73 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
FunctionRegistry, NoSuchPartitionException, NoSuchTableException, 
TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -89,11 +89,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
-      schema = Seq(
-        CatalogColumn("col1", "int"),
-        CatalogColumn("col2", "string"),
-        CatalogColumn("a", "int"),
-        CatalogColumn("b", "int")),
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "int"),
       partitionColumnNames = Seq("a", "b"),
       createTime = 0L)
   }
@@ -258,9 +258,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       userSpecifiedPartitionCols: Option[String],
       expectedSchema: StructType,
       expectedPartitionCols: Seq[String]): Unit = {
-    var tableSchema = StructType(Nil)
-    var partCols = Seq.empty[String]
-
     val tabName = "tab1"
     withTable(tabName) {
       val partitionClause =
@@ -277,11 +274,11 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
          """.stripMargin)
       val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
 
-      tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
-      partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+      assert(expectedSchema ==
+        DDLUtils.getSchemaFromTableProperties(tableMetadata))
+      assert(expectedPartitionCols ==
+        DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata))
     }
-    assert(tableSchema == expectedSchema)
-    assert(partCols == expectedPartitionCols)
   }
 
   test("Create partitioned data source table without user specified schema") {
@@ -601,7 +598,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
-      assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", 
"int")))
+      assert(table.schema == new StructType().add("a", "int").add("b", "int"))
       assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index f3c849b..195fce8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.types.StructField
 
 
 private[hive] case class MetastoreRelation(
@@ -61,8 +61,8 @@ private[hive] case class MetastoreRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: 
sparkSession :: Nil
 
-  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
-    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  private def toHiveColumn(c: StructField): FieldSchema = {
+    new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
   }
 
   // TODO: merge this with HiveClientImpl#toHiveTable
@@ -200,17 +200,17 @@ private[hive] case class MetastoreRelation(
     hiveQlTable.getMetadata
   )
 
-  implicit class SchemaAttribute(f: CatalogColumn) {
+  implicit class SchemaAttribute(f: StructField) {
     def toAttribute: AttributeReference = AttributeReference(
       f.name,
-      CatalystSqlParser.parseDataType(f.dataType),
+      f.dataType,
       // Since data can be dumped in randomly with no validation, everything 
is nullable.
       nullable = true
     )(qualifier = Some(tableName))
   }
 
   /** PartitionKey attributes */
-  val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
+  val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
 
   /** Non-partitionKey attributes */
   // TODO: just make this hold the schema itself, not just non-partition 
columns

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 2392cc0..ef69ac7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -43,8 +43,10 @@ import 
org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPa
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
@@ -336,7 +338,7 @@ private[hive] class HiveClientImpl(
       // Note: Hive separates partition columns and the schema, but for us the
       // partition columns are part of the schema
       val partCols = h.getPartCols.asScala.map(fromHiveColumn)
-      val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
+      val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ 
partCols)
 
       // Skew spec, storage handler, and bucketing info can't be mapped to 
CatalogTable (yet)
       val unsupportedFeatures = ArrayBuffer.empty[String]
@@ -721,16 +723,22 @@ private[hive] class HiveClientImpl(
     Utils.classForName(name)
       .asInstanceOf[Class[_ <: 
org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
-  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
-    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  private def toHiveColumn(c: StructField): FieldSchema = {
+    new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
   }
 
-  private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
-    new CatalogColumn(
+  private def fromHiveColumn(hc: FieldSchema): StructField = {
+    val columnType = try {
+      CatalystSqlParser.parseDataType(hc.getType)
+    } catch {
+      case e: ParseException =>
+        throw new SparkException("Cannot recognize hive type string: " + 
hc.getType, e)
+    }
+    val field = StructField(
       name = hc.getName,
-      dataType = hc.getType,
-      nullable = true,
-      comment = Option(hc.getComment))
+      dataType = columnType,
+      nullable = true)
+    Option(hc.getComment).map(field.withComment).getOrElse(field)
   }
 
   private def toHiveTable(table: CatalogTable): HiveTable = {

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 2762e0c..678bf8d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
@@ -65,9 +65,7 @@ case class CreateHiveTableAsSelectCommand(
       val withSchema = if (withFormat.schema.isEmpty) {
         // Hive doesn't support specifying the column list for target table in 
CTAS
         // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema = query.output.map { c =>
-          CatalogColumn(c.name, c.dataType.catalogString)
-        })
+        tableDesc.copy(schema = query.output.toStructType)
       } else {
         withFormat
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 5450fba..e0c07db 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans
 import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, 
ScriptTransformation}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.types.StructType
 
 class HiveDDLCommandSuite extends PlanTest {
   val parser = TestHive.sessionState.sqlParser
@@ -67,7 +68,7 @@ class HiveDDLCommandSuite extends PlanTest {
     // TODO will be SQLText
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+    assert(desc.partitionColumnNames.isEmpty)
     assert(desc.storage.inputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
     assert(desc.storage.outputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==
@@ -98,7 +99,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.comment == Some("This is the staging page view table"))
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+    assert(desc.partitionColumnNames.isEmpty)
     assert(desc.storage.properties == Map())
     assert(desc.storage.inputFormat == 
Some("parquet.hive.DeprecatedParquetInputFormat"))
     assert(desc.storage.outputFormat == 
Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -114,7 +115,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.table == "page_view")
     assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.storage.locationUri == None)
-    assert(desc.schema == Seq.empty[CatalogColumn])
+    assert(desc.schema.isEmpty)
     assert(desc.viewText == None) // TODO will be SQLText
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.storage.properties == Map())
@@ -150,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.table == "ctas2")
     assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.storage.locationUri == None)
-    assert(desc.schema == Seq.empty[CatalogColumn])
+    assert(desc.schema.isEmpty)
     assert(desc.viewText == None) // TODO will be SQLText
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> 
"p2")))
@@ -291,7 +292,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "my_table")
     assert(desc.tableType == CatalogTableType.MANAGED)
-    assert(desc.schema == Seq(CatalogColumn("id", "int"), 
CatalogColumn("name", "string")))
+    assert(desc.schema == new StructType().add("id", "int").add("name", 
"string"))
     assert(desc.partitionColumnNames.isEmpty)
     assert(desc.bucketSpec.isEmpty)
     assert(desc.viewText.isEmpty)
@@ -342,10 +343,10 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create table - partitioned columns") {
     val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY 
(month int)"
     val (desc, _) = extractTableDesc(query)
-    assert(desc.schema == Seq(
-      CatalogColumn("id", "int"),
-      CatalogColumn("name", "string"),
-      CatalogColumn("month", "int")))
+    assert(desc.schema == new StructType()
+      .add("id", "int")
+      .add("name", "string")
+      .add("month", "int"))
     assert(desc.partitionColumnNames == Seq("month"))
   }
 
@@ -446,10 +447,10 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.database == Some("dbx"))
     assert(desc.identifier.table == "my_table")
     assert(desc.tableType == CatalogTableType.EXTERNAL)
-    assert(desc.schema == Seq(
-      CatalogColumn("id", "int"),
-      CatalogColumn("name", "string"),
-      CatalogColumn("month", "int")))
+    assert(desc.schema == new StructType()
+      .add("id", "int")
+      .add("name", "string")
+      .add("month", "int"))
     assert(desc.partitionColumnNames == Seq("month"))
     assert(desc.bucketSpec.isEmpty)
     assert(desc.viewText.isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 754aabb..9d72367 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
-import org.apache.spark.sql.types.{DecimalType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, 
StructField, StructType}
 
 class HiveMetastoreCatalogSuite extends TestHiveSingleton {
   import spark.implicits._
@@ -102,7 +102,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
 
         val columns = hiveTable.schema
         assert(columns.map(_.name) === Seq("d1", "d2"))
-        assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+        assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
 
         checkAnswer(table("t"), testDF)
         assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === 
Seq("1.1\t1", "2.1\t2"))
@@ -135,7 +135,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+          assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), 
StringType))
 
           checkAnswer(table("t"), testDF)
           assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") ===
@@ -166,7 +166,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.dataType) === Seq("int", "string"))
+          assert(columns.map(_.dataType) === Seq(IntegerType, StringType))
 
           checkAnswer(table("t"), Row(1, "val_1"))
           assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === 
Seq("1\tval_1"))

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 571cae0..c87bda9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -726,7 +726,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = Seq.empty,
+        schema = new StructType,
         storage = CatalogStorageFormat(
           locationUri = None,
           inputFormat = None,
@@ -998,7 +998,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       // As a proxy for verifying that the table was stored in Hive compatible 
format,
       // we verify that each column of the table is of native type StringType.
       assert(sharedState.externalCatalog.getTable("default", 
"not_skip_hive_metadata").schema
-        .forall(column => CatalystSqlParser.parseDataType(column.dataType) == 
StringType))
+        .forall(_.dataType == StringType))
 
       createDataSourceTable(
         sparkSession = spark,
@@ -1013,8 +1013,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       // As a proxy for verifying that the table was stored in SparkSQL format,
       // we verify that the table has a column type as array of StringType.
       assert(sharedState.externalCatalog.getTable("default", 
"skip_hive_metadata")
-        .schema.forall { c =>
-          CatalystSqlParser.parseDataType(c.dataType) == ArrayType(StringType) 
})
+        .schema.forall(_.dataType == ArrayType(StringType)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/301fb0d7/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 066c3ff..a2509f2 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
 import java.io.{ByteArrayOutputStream, File, PrintStream}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
@@ -32,10 +31,11 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.tags.ExtendedHiveTest
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
@@ -146,7 +146,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
       CatalogTable(
         identifier = TableIdentifier(tableName, Some(database)),
         tableType = CatalogTableType.MANAGED,
-        schema = Seq(CatalogColumn("key", "int")),
+        schema = new StructType().add("key", "int"),
         storage = CatalogStorageFormat(
           locationUri = None,
           inputFormat = Some(classOf[TextInputFormat].getName),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to