Repository: spark
Updated Branches:
  refs/heads/master da3557429 -> a10b328db


[SPARK-22431][SQL] Ensure that the datatype in the schema for the table/view 
metadata is parseable by Spark before persisting it

## What changes were proposed in this pull request?
* JIRA:  [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)  : 
Creating Permanent view with illegal type

**Description:**
- It is possible in Spark SQL to create a permanent view that uses an nested 
field with an illegal name.
- For example if we create the following view:
```create view x as select struct('a' as `$q`, 1 as b) q```
- A simple select fails with the following exception:

```
select * from x;

org.apache.spark.SparkException: Cannot recognize hive type string: 
struct<$q:string,b:int>
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
...
```
**Issue/Analysis**: Right now, we can create a view with a schema that cannot 
be read back by Spark from the Hive metastore.  For more details, please see 
the discussion about the analysis and proposed fix options in comment 1 and 
comment 2 in the 
[SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)

**Proposed changes**:
 - Fix the hive table/view codepath to check whether the schema datatype is 
parseable by Spark before persisting it in the metastore. This change is 
localized to HiveClientImpl to do the check similar to the check in 
FromHiveColumn. This is fail-fast and we will avoid the scenario where we write 
something to the metastore that we are unable to read it back.
- Added new unit tests
- Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK

With the fix:
```
create view x as select struct('a' as `$q`, 1 as b) q;
17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select 
struct('a' as `$q`, 1 as b) q]
org.apache.spark.SparkException: Cannot recognize hive type string: 
struct<$q:string,b:int>
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
...
```
## How was this patch tested?
- New unit tests have been added.

hvanhovell, Please review and share your thoughts/comments.  Thank you so much.

Author: Sunitha Kambhampati <skam...@us.ibm.com>

Closes #19747 from skambha/spark22431.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a10b328d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a10b328d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a10b328d

Branch: refs/heads/master
Commit: a10b328dbc056aafaa696579f9a6e2b0cb8eb25f
Parents: da35574
Author: Sunitha Kambhampati <skam...@us.ibm.com>
Authored: Tue Nov 28 22:01:01 2017 +0100
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Tue Nov 28 22:01:01 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/command/DDLSuite.scala  | 15 ++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 17 +++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 82 ++++++++++++++++++++
 3 files changed, 111 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 878f435..fdb9b2f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -117,6 +117,21 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with 
SharedSQLContext with Befo
     }
   }
 
+  test("SPARK-22431: table with nested type col with special char") {
+    withTable("t") {
+      spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING 
PARQUET")
+      checkAnswer(spark.table("t"), Nil)
+    }
+  }
+
+  test("SPARK-22431: view with nested type") {
+    withView("t", "v") {
+      spark.sql("CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+      checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
+    }
+  }
 }
 
 abstract class DDLSuite extends QueryTest with SQLTestUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b5a5890..47ce6ba 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -488,6 +488,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit 
= withHiveState {
+    verifyColumnDataType(table.dataSchema)
     client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
   }
 
@@ -507,6 +508,7 @@ private[hive] class HiveClientImpl(
     // these properties are still available to the others that share the same 
Hive metastore.
     // If users explicitly alter these Hive-specific properties through ALTER 
TABLE DDL, we respect
     // these user-specified values.
+    verifyColumnDataType(table.dataSchema)
     val hiveTable = toHiveTable(
       table.copy(properties = table.ignoredProperties ++ table.properties), 
Some(userName))
     // Do not use `table.qualifiedName` here because this may be a rename
@@ -520,6 +522,7 @@ private[hive] class HiveClientImpl(
       newDataSchema: StructType,
       schemaProps: Map[String, String]): Unit = withHiveState {
     val oldTable = client.getTable(dbName, tableName)
+    verifyColumnDataType(newDataSchema)
     val hiveCols = newDataSchema.map(toHiveColumn)
     oldTable.setFields(hiveCols.asJava)
 
@@ -872,15 +875,19 @@ private[hive] object HiveClientImpl {
     new FieldSchema(c.name, typeString, c.getComment().orNull)
   }
 
-  /** Builds the native StructField from Hive's FieldSchema. */
-  def fromHiveColumn(hc: FieldSchema): StructField = {
-    val columnType = try {
+  /** Get the Spark SQL native DataType from Hive's FieldSchema. */
+  private def getSparkSQLDataType(hc: FieldSchema): DataType = {
+    try {
       CatalystSqlParser.parseDataType(hc.getType)
     } catch {
       case e: ParseException =>
         throw new SparkException("Cannot recognize hive type string: " + 
hc.getType, e)
     }
+  }
 
+  /** Builds the native StructField from Hive's FieldSchema. */
+  def fromHiveColumn(hc: FieldSchema): StructField = {
+    val columnType = getSparkSQLDataType(hc)
     val metadata = if (hc.getType != columnType.catalogString) {
       new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
     } else {
@@ -895,6 +902,10 @@ private[hive] object HiveClientImpl {
     Option(hc.getComment).map(field.withComment).getOrElse(field)
   }
 
+  private def verifyColumnDataType(schema: StructType): Unit = {
+    schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
+  }
+
   private def toInputFormat(name: String) =
     Utils.classForName(name).asInstanceOf[Class[_ <: 
org.apache.hadoop.mapred.InputFormat[_, _]]]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d3465a6..9063ef0 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
     testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+    val queries = Seq(
+      "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+      "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+      "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+    queries.foreach(query => {
+      val err = intercept[SparkException] {
+        spark.sql(query)
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string"))
+    })
+
+    withView("v") {
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+      val err = intercept[SparkException] {
+        spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string"))
+    }
+  }
+
+  test("SPARK-22431: table with nested type") {
+    withTable("t", "x") {
+      spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING 
PARQUET")
+      checkAnswer(spark.table("t"), Nil)
+      spark.sql("CREATE TABLE x (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      checkAnswer(spark.table("x"), Nil)
+    }
+  }
+
+  test("SPARK-22431: view with nested type") {
+    withView("v") {
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(spark.table("v"), Row(Row("a", 1)) :: Nil)
+
+      spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
+      val df = spark.table("v")
+      assert("q1".equals(df.schema.fields(0).name))
+      checkAnswer(df, Row(Row("a", 1)) :: Nil)
+    }
+  }
+
+  test("SPARK-22431: alter table tests with nested types") {
+    withTable("t1", "t2", "t3") {
+      spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, 
col2:Int>)")
+      val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
+      assert("newcol1".equals(newcol))
+
+      spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) USING 
PARQUET")
+      spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, 
col2:Int>)")
+      spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, 
col2:Int>)")
+
+      val df2 = spark.table("t2")
+      checkAnswer(df2, Nil)
+      assert("newcol1".equals(df2.schema.fields(2).name))
+      assert("newcol2".equals(df2.schema.fields(3).name))
+
+      spark.sql("CREATE TABLE t3(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+      spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, 
col2:Int>)")
+      spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, 
col2:Int>)")
+
+      val df3 = spark.table("t3")
+      checkAnswer(df3, Nil)
+      assert("newcol1".equals(df3.schema.fields(2).name))
+      assert("newcol2".equals(df3.schema.fields(3).name))
+    }
+  }
+
+  test("SPARK-22431: negative alter table tests with nested types") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      val err = intercept[SparkException] {
+        spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, 
col2:Int>)")
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string:"))
+   }
+  }
 }
 
 class HiveDDLSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to