Repository: spark
Updated Branches:
  refs/heads/master b6a7aa4f7 -> 894235390


[SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive overwrite

## What changes were proposed in this pull request?

After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now 
can treat hive as a data source and create hive tables with DataFrameWriter and 
Catalog. However, the support is not completed, there are still some cases we 
do not support.

This PR implement:
DataFrameWriter.saveAsTable work with hive format with overwrite mode

## How was this patch tested?
unit test added

Author: windpiger <song...@outlook.com>

Closes #16549 from windpiger/saveAsTableWithHiveOverwrite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89423539
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89423539
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89423539

Branch: refs/heads/master
Commit: 8942353905c354c4ce31b0d1a44d33feb3dcf737
Parents: b6a7aa4
Author: windpiger <song...@outlook.com>
Authored: Sat Jan 14 10:53:33 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sat Jan 14 10:53:33 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 15 ++++++++----
 .../apache/spark/sql/hive/HiveStrategies.scala  |  9 ++++----
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 24 ++++++++++++++++----
 3 files changed, 34 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89423539/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 82331fd..7fc03bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
@@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case (true, SaveMode.Overwrite) =>
-        // Get all input data source relations of the query.
+        // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
+          case relation: CatalogRelation if 
DDLUtils.isHiveTable(relation.catalogTable) =>
+            relation.catalogTable.identifier
         }
         EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) 
match {
-          // Only do the check if the table is a data source table (the 
relation is a BaseRelation).
-          // TODO(cloud-fan): also check hive table relation here when we 
support overwrite mode
-          // for creating hive tables.
+          // check if the table is a data source table (the relation is a 
BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _) if 
srcRelations.contains(dest) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read 
from")
+          // check hive table relation when overwrite mode
+          case relation: CatalogRelation if 
DDLUtils.isHiveTable(relation.catalogTable)
+            && srcRelations.contains(relation.catalogTable.identifier) =>
+            throw new AnalysisException(
+              s"Cannot overwrite table $tableName that is also being read 
from")
           case _ => // OK
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89423539/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6d5cc57..d1f11e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -109,12 +109,11 @@ private[hive] trait HiveStrategies {
           table, partition, planLater(child), overwrite, ifNotExists) :: Nil
 
       case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
-        // Currently we will never hit this branch, as SQL string API can only 
use `Ignore` or
-        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't 
support hive serde
-        // tables yet.
-        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+        // Currently `DataFrameWriter.saveAsTable` doesn't support
+        // the Append mode of hive serde tables yet.
+        if (mode == SaveMode.Append) {
           throw new AnalysisException(
-            "CTAS for hive serde tables does not support append or overwrite 
semantics.")
+            "CTAS for hive serde tables does not support append semantics.")
         }
 
         val dbName = 
tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)

http://git-wip-us.apache.org/repos/asf/spark/blob/89423539/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 0af331e..e3f1667 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1314,7 +1314,24 @@ class HiveDDLSuite
         .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
       checkAnswer(spark.table("t"), Row(1, "a"))
 
-      val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      Seq("c" -> 1).toDF("i", "j").write.format("hive")
+        .mode(SaveMode.Overwrite).option("fileFormat", 
"parquet").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row("c", 1))
+
+      var table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.isHiveTable(table))
+      assert(table.storage.inputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+      assert(table.storage.outputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+      assert(table.storage.serde ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+      Seq(9 -> "x").toDF("i", "j")
+        .write.format("hive").mode(SaveMode.Overwrite).option("fileFormat", 
"avro").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(9, "x"))
+
+      table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
       assert(DDLUtils.isHiveTable(table))
       assert(table.storage.inputFormat ==
         Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
@@ -1324,7 +1341,7 @@ class HiveDDLSuite
         Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
 
       sql("INSERT INTO t SELECT 2, 'b'")
-      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+      checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
 
       val e = intercept[AnalysisException] {
         Seq(1 -> "a").toDF("i", 
"j").write.format("hive").partitionBy("i").saveAsTable("t2")
@@ -1340,8 +1357,7 @@ class HiveDDLSuite
       val e3 = intercept[AnalysisException] {
         
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
       }
-      assert(e3.message.contains(
-        "CTAS for hive serde tables does not support append or overwrite 
semantics"))
+      assert(e3.message.contains("Cannot overwrite table default.t that is 
also being read from"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to