spark git commit: [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping

2016-05-12 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 68617e1ad -> 9c5c9013d


[SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and 
Checking Partition Spec Existence Before Dropping

 What changes were proposed in this pull request?
~~Currently, multiple partitions are allowed to drop by using a single DDL 
command: Alter Table Drop Partition. However, the internal implementation could 
break atomicity. That means, we could just drop a subset of qualified 
partitions, if hitting an exception when dropping one of qualified partitions~~

~~This PR contains the following behavior changes:~~
~~- disallow dropping multiple partitions by a single command ~~
~~- allow users to input predicates in partition specification and issue a 
nicer error message if the predicate's comparison operator is not `=`.~~
~~- verify the partition spec in SessionCatalog. This can ensure each partition 
spec in `Drop Partition` does not correspond to multiple partitions.~~

This PR has two major parts:
- Verify the partition spec in SessionCatalog for fixing the following issue:
  ```scala
  sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', 
unknownCol='12')")
  ```
  Above example uses an invalid partition spec. Without this PR, we will drop 
all the partitions. The reason is Hive megastores getPartitions API returns all 
the partitions if we provide an invalid spec.

- Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check 
if all the user-specified partition specs exist before attempting to drop the 
partitions. Previously, we start drop the partition before completing checking 
the existence of all the partition specs. If any failure happened after we 
start to drop the partitions, we will log an error message to indicate which 
partitions have been dropped and which partitions have not been dropped.

 How was this patch tested?
Modified the existing test cases and added new test cases.

Author: gatorsmile 
Author: xiaoli 
Author: Xiao Li 

Closes #12801 from gatorsmile/banDropMultiPart.

(cherry picked from commit be617f3d0695982f982006fdd79afe3e3730b4c4)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c5c9013
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c5c9013
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c5c9013

Branch: refs/heads/branch-2.0
Commit: 9c5c9013de1311b3175a6156fb90447f00c7a883
Parents: 68617e1
Author: gatorsmile 
Authored: Thu May 12 11:14:40 2016 -0700
Committer: Andrew Or 
Committed: Thu May 12 11:14:52 2016 -0700

--
 .../sql/catalyst/catalog/SessionCatalog.scala   |  47 +++-
 .../catalyst/catalog/ExternalCatalogSuite.scala |   6 +
 .../catalyst/catalog/SessionCatalogSuite.scala  | 116 ++-
 .../spark/sql/execution/command/DDLSuite.scala  |  78 ++---
 .../spark/sql/hive/client/HiveClientImpl.scala  |  50 +---
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   9 +-
 6 files changed, 248 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9c5c9013/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0fc4ab5..54b30d3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -510,6 +510,7 @@ class SessionCatalog(
   tableName: TableIdentifier,
   parts: Seq[CatalogTablePartition],
   ignoreIfExists: Boolean): Unit = {
+requireExactMatchedPartitionSpec(parts.map(_.spec), 
getTableMetadata(tableName))
 val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
 val table = formatTableName(tableName.table)
 requireDbExists(db)
@@ -523,13 +524,14 @@ class SessionCatalog(
*/
   def dropPartitions(
   tableName: TableIdentifier,
-  parts: Seq[TablePartitionSpec],
+  specs: Seq[TablePartitionSpec],
   ignoreIfNotExists: Boolean): Unit = {
+requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
 val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
 val table = formatTableName(tableName.table)
 requireDbExists(db)
 requireTableExists(TableIdentifier(table, Option(db)))
-externalCatalog.dropPartitions(db, table, 

spark git commit: [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping

2016-05-12 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 470de743e -> be617f3d0


[SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and 
Checking Partition Spec Existence Before Dropping

 What changes were proposed in this pull request?
~~Currently, multiple partitions are allowed to drop by using a single DDL 
command: Alter Table Drop Partition. However, the internal implementation could 
break atomicity. That means, we could just drop a subset of qualified 
partitions, if hitting an exception when dropping one of qualified partitions~~

~~This PR contains the following behavior changes:~~
~~- disallow dropping multiple partitions by a single command ~~
~~- allow users to input predicates in partition specification and issue a 
nicer error message if the predicate's comparison operator is not `=`.~~
~~- verify the partition spec in SessionCatalog. This can ensure each partition 
spec in `Drop Partition` does not correspond to multiple partitions.~~

This PR has two major parts:
- Verify the partition spec in SessionCatalog for fixing the following issue:
  ```scala
  sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', 
unknownCol='12')")
  ```
  Above example uses an invalid partition spec. Without this PR, we will drop 
all the partitions. The reason is Hive megastores getPartitions API returns all 
the partitions if we provide an invalid spec.

- Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check 
if all the user-specified partition specs exist before attempting to drop the 
partitions. Previously, we start drop the partition before completing checking 
the existence of all the partition specs. If any failure happened after we 
start to drop the partitions, we will log an error message to indicate which 
partitions have been dropped and which partitions have not been dropped.

 How was this patch tested?
Modified the existing test cases and added new test cases.

Author: gatorsmile 
Author: xiaoli 
Author: Xiao Li 

Closes #12801 from gatorsmile/banDropMultiPart.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be617f3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be617f3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be617f3d

Branch: refs/heads/master
Commit: be617f3d0695982f982006fdd79afe3e3730b4c4
Parents: 470de74
Author: gatorsmile 
Authored: Thu May 12 11:14:40 2016 -0700
Committer: Andrew Or 
Committed: Thu May 12 11:14:40 2016 -0700

--
 .../sql/catalyst/catalog/SessionCatalog.scala   |  47 +++-
 .../catalyst/catalog/ExternalCatalogSuite.scala |   6 +
 .../catalyst/catalog/SessionCatalogSuite.scala  | 116 ++-
 .../spark/sql/execution/command/DDLSuite.scala  |  78 ++---
 .../spark/sql/hive/client/HiveClientImpl.scala  |  50 +---
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   9 +-
 6 files changed, 248 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0fc4ab5..54b30d3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -510,6 +510,7 @@ class SessionCatalog(
   tableName: TableIdentifier,
   parts: Seq[CatalogTablePartition],
   ignoreIfExists: Boolean): Unit = {
+requireExactMatchedPartitionSpec(parts.map(_.spec), 
getTableMetadata(tableName))
 val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
 val table = formatTableName(tableName.table)
 requireDbExists(db)
@@ -523,13 +524,14 @@ class SessionCatalog(
*/
   def dropPartitions(
   tableName: TableIdentifier,
-  parts: Seq[TablePartitionSpec],
+  specs: Seq[TablePartitionSpec],
   ignoreIfNotExists: Boolean): Unit = {
+requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
 val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
 val table = formatTableName(tableName.table)
 requireDbExists(db)
 requireTableExists(TableIdentifier(table, Option(db)))
-externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
+externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
   }
 
   /**
@@ -542,6 +544,9