Repository: spark
Updated Branches:
  refs/heads/master 188321623 -> fd4ba3f62


[SPARK-17192][SQL] Issue Exception when Users Specify the Partitioning Columns 
without a Given Schema

### What changes were proposed in this pull request?
Address the comments by yhuai in the original PR: 
https://github.com/apache/spark/pull/14207

First, issue an exception instead of logging a warning when users specify the 
partitioning columns without a given schema.

Second, refactor the codes a little.

### How was this patch tested?
Fixed the test cases.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #14572 from gatorsmile/followup16552.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd4ba3f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd4ba3f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd4ba3f6

Branch: refs/heads/master
Commit: fd4ba3f626f49d7d616a2a334d45b1c736e1db1c
Parents: 1883216
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Fri Aug 26 11:13:38 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Aug 26 11:13:38 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 25 +++++++-------------
 .../spark/sql/execution/command/DDLSuite.scala  | 17 +++++++++----
 .../spark/sql/hive/HiveExternalCatalog.scala    | 16 +++++++------
 3 files changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fd4ba3f6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5eb2f0a..f14c63c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -72,29 +72,20 @@ case class PreprocessDDL(conf: SQLConf) extends 
Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // When we CREATE TABLE without specifying the table schema, we should 
fail the query if
-    // bucketing information is specified, as we can't infer bucketing from 
data files currently,
-    // and we should ignore the partition columns if it's specified, as we 
will infer it later, at
-    // runtime.
+    // bucketing information is specified, as we can't infer bucketing from 
data files currently.
+    // Since the runtime inferred partition columns could be different from 
what user specified,
+    // we fail the query if the partitioning information is specified.
     case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
       if (tableDesc.bucketSpec.isDefined) {
         failAnalysis("Cannot specify bucketing information if the table schema 
is not specified " +
           "when creating and will be inferred at runtime")
       }
-
-      val partitionColumnNames = tableDesc.partitionColumnNames
-      if (partitionColumnNames.nonEmpty) {
-        // The table does not have a specified schema, which means that the 
schema will be inferred
-        // at runtime. So, we are not expecting partition columns and we will 
discover partitions
-        // at runtime. However, if there are specified partition columns, we 
simply ignore them and
-        // provide a warning message.
-        logWarning(
-          s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
-            s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
-            "be inferred.")
-        c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
-      } else {
-        c
+      if (tableDesc.partitionColumnNames.nonEmpty) {
+        failAnalysis("It is not allowed to specify partition columns when the 
table schema is " +
+          "not defined. When the table schema is not provided, schema and 
partition columns " +
+          "will be inferred.")
       }
+      c
 
     // Here we normalize partition, bucket and sort column names, w.r.t. the 
case sensitivity
     // config, and do various checks:

http://git-wip-us.apache.org/repos/asf/spark/blob/fd4ba3f6/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e6ae422..b343454 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -265,7 +265,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
         userSpecifiedPartitionCols.map(p => s"PARTITIONED BY 
($p)").getOrElse("")
       val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
       val uri = path.toURI
-      sql(
+      val sqlCreateTable =
         s"""
            |CREATE TABLE $tabName $schemaClause
            |USING parquet
@@ -273,11 +273,18 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
            |  path '$uri'
            |)
            |$partitionClause
-         """.stripMargin)
-      val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+         """.stripMargin
+      if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) {
+        val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage
+        assert(e.contains(
+          "not allowed to specify partition columns when the table schema is 
not defined"))
+      } else {
+        sql(sqlCreateTable)
+        val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
 
-      assert(expectedSchema == tableMetadata.schema)
-      assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+        assert(expectedSchema == tableMetadata.schema)
+        assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fd4ba3f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2586d11..7f50e38 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -622,24 +622,26 @@ object HiveExternalCatalog {
   def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
     val errorMessage = "Could not read schema from the hive metastore because 
it is corrupted."
     val props = metadata.properties
-    props.get(DATASOURCE_SCHEMA).map { schema =>
+    val schema = props.get(DATASOURCE_SCHEMA)
+    if (schema.isDefined) {
       // Originally, we used `spark.sql.sources.schema` to store the schema of 
a data source table.
       // After SPARK-6024, we removed this flag.
       // Although we are not using `spark.sql.sources.schema` any more, we 
need to still support.
-      DataType.fromJson(schema).asInstanceOf[StructType]
-    } getOrElse {
-      props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
-        val parts = (0 until numParts.toInt).map { index =>
+      DataType.fromJson(schema.get).asInstanceOf[StructType]
+    } else {
+      val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
+      if (numSchemaParts.isDefined) {
+        val parts = (0 until numSchemaParts.get.toInt).map { index =>
           val part = 
metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
           if (part == null) {
             throw new AnalysisException(errorMessage +
-              s" (missing part $index of the schema, $numParts parts are 
expected).")
+              s" (missing part $index of the schema, ${numSchemaParts.get} 
parts are expected).")
           }
           part
         }
         // Stick all parts back to a single schema string.
         DataType.fromJson(parts.mkString).asInstanceOf[StructType]
-      } getOrElse {
+      } else {
         throw new AnalysisException(errorMessage)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to