spark git commit: [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping
Repository: spark Updated Branches: refs/heads/branch-2.0 68617e1ad -> 9c5c9013d [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping What changes were proposed in this pull request? ~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter Table Drop Partition. However, the internal implementation could break atomicity. That means, we could just drop a subset of qualified partitions, if hitting an exception when dropping one of qualified partitions~~ ~~This PR contains the following behavior changes:~~ ~~- disallow dropping multiple partitions by a single command ~~ ~~- allow users to input predicates in partition specification and issue a nicer error message if the predicate's comparison operator is not `=`.~~ ~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop Partition` does not correspond to multiple partitions.~~ This PR has two major parts: - Verify the partition spec in SessionCatalog for fixing the following issue: ```scala sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')") ``` Above example uses an invalid partition spec. Without this PR, we will drop all the partitions. The reason is Hive megastores getPartitions API returns all the partitions if we provide an invalid spec. - Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the user-specified partition specs exist before attempting to drop the partitions. Previously, we start drop the partition before completing checking the existence of all the partition specs. If any failure happened after we start to drop the partitions, we will log an error message to indicate which partitions have been dropped and which partitions have not been dropped. How was this patch tested? Modified the existing test cases and added new test cases. Author: gatorsmileAuthor: xiaoli Author: Xiao Li Closes #12801 from gatorsmile/banDropMultiPart. (cherry picked from commit be617f3d0695982f982006fdd79afe3e3730b4c4) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c5c9013 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c5c9013 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c5c9013 Branch: refs/heads/branch-2.0 Commit: 9c5c9013de1311b3175a6156fb90447f00c7a883 Parents: 68617e1 Author: gatorsmile Authored: Thu May 12 11:14:40 2016 -0700 Committer: Andrew Or Committed: Thu May 12 11:14:52 2016 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 47 +++- .../catalyst/catalog/ExternalCatalogSuite.scala | 6 + .../catalyst/catalog/SessionCatalogSuite.scala | 116 ++- .../spark/sql/execution/command/DDLSuite.scala | 78 ++--- .../spark/sql/hive/client/HiveClientImpl.scala | 50 +--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 9 +- 6 files changed, 248 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c5c9013/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0fc4ab5..54b30d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -510,6 +510,7 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { +requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -523,13 +524,14 @@ class SessionCatalog( */ def dropPartitions( tableName: TableIdentifier, - parts: Seq[TablePartitionSpec], + specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { +requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) -externalCatalog.dropPartitions(db, table,
spark git commit: [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping
Repository: spark Updated Branches: refs/heads/master 470de743e -> be617f3d0 [SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping What changes were proposed in this pull request? ~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter Table Drop Partition. However, the internal implementation could break atomicity. That means, we could just drop a subset of qualified partitions, if hitting an exception when dropping one of qualified partitions~~ ~~This PR contains the following behavior changes:~~ ~~- disallow dropping multiple partitions by a single command ~~ ~~- allow users to input predicates in partition specification and issue a nicer error message if the predicate's comparison operator is not `=`.~~ ~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop Partition` does not correspond to multiple partitions.~~ This PR has two major parts: - Verify the partition spec in SessionCatalog for fixing the following issue: ```scala sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')") ``` Above example uses an invalid partition spec. Without this PR, we will drop all the partitions. The reason is Hive megastores getPartitions API returns all the partitions if we provide an invalid spec. - Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the user-specified partition specs exist before attempting to drop the partitions. Previously, we start drop the partition before completing checking the existence of all the partition specs. If any failure happened after we start to drop the partitions, we will log an error message to indicate which partitions have been dropped and which partitions have not been dropped. How was this patch tested? Modified the existing test cases and added new test cases. Author: gatorsmileAuthor: xiaoli Author: Xiao Li Closes #12801 from gatorsmile/banDropMultiPart. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be617f3d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be617f3d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be617f3d Branch: refs/heads/master Commit: be617f3d0695982f982006fdd79afe3e3730b4c4 Parents: 470de74 Author: gatorsmile Authored: Thu May 12 11:14:40 2016 -0700 Committer: Andrew Or Committed: Thu May 12 11:14:40 2016 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 47 +++- .../catalyst/catalog/ExternalCatalogSuite.scala | 6 + .../catalyst/catalog/SessionCatalogSuite.scala | 116 ++- .../spark/sql/execution/command/DDLSuite.scala | 78 ++--- .../spark/sql/hive/client/HiveClientImpl.scala | 50 +--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 9 +- 6 files changed, 248 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be617f3d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0fc4ab5..54b30d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -510,6 +510,7 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { +requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -523,13 +524,14 @@ class SessionCatalog( */ def dropPartitions( tableName: TableIdentifier, - parts: Seq[TablePartitionSpec], + specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { +requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) -externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) +externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists) } /** @@ -542,6 +544,9