[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19691


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-07-25 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r205047473
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

thank you @DazhuangSu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-07-25 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r205013855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

@mgaido91 I understand your point, yes it would be inefficient. I will work 
on this soon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-07-16 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r202637455
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

@DazhuangSu sorry I missed your last comment somehow.

Why do you say it would not be inefficient if you have a lot of 
partitions?I think it would be! Imagine that you partition per year and day. 
And you want to get the first 6 months of this year. The spec would be 
something like `(year = 2018, day < 2018-07-01)`. Imagine we have a 10 years 
history. With the current implementation, we would get back basically all the 
the partitions from the filter, ie. roughly 3.650 and then it will intersect 
those. Anyway, my understanding is that such a case would not even work 
properly, as it would try drop the intersect of:
```
Seq(Seq("year"-> "2018", "day" -> "2018-01-01", 
...)).intersect(Seq(Map("year"->"2018")))
```
which would result in an empty Seq, so we would drop nothing. Moreover, I 
saw no test for this case in the tests. Can we add tests for this use case and 
can we add support for it if my understanding that it is not working is right? 
Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-07 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193691275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

hi, @mgaido91 there is one problem after I changed the syntax,
when i run sql `DROP PARTITION (p >=2)` it throws
`org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input 
columns: []`
I'm trying to find a way to figure it out.

By the way, is a syntax like `((partitionVal (',' partitionVal)*) | 
(expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, 
but it didn't work as I supposed.

Besides, I was wrong that day. I think the if conditions won't be 
inefficient if there is a lot of partitions. it maybe inefficient if there are 
a lot of dropPartitionSpec which I don't think can happen easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-06 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193358172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

I think we can (must) just have a single: `AlterTableDropPartitionCommand( 
tableName: TableIdentifier, partitionSpecs: Seq[TablePartitionSpec], 
partitionExprs: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, 
retainData: Boolean)`. Indeed, we might have something like:
```
alter table foo drop partition (year=2017, month=12), partition(year=2018, 
month < 3);
```
where we have both a partition spec and an expression specification.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-06 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193356194
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

I mean how to define `AlterTableDropPartitionCommand` better in 
`ddl.scala`. need to handle both
`AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partitions: Seq[Seq[Expression]],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)`
and
`AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partitions: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)`
Maybe telling the different cases inside the method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-06 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193347767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

why? Isn't it enough something like:

```
((partitionVal (',' partitionVal)*) | (expression (',' expression)*))
```
?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-05 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193171992
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

Yeah, I agree. And the hard part may be how to convert a `partitionSpec` to 
an `EqualsTo`. 
I think it's better to let the `AstBuilder` to handle this. If so, we may 
have to have two `AlterTableDropPartitionCommand` instances in `ddl.scala`, one 
for all `partitionSpec` and one for all `expression`. But it maybe a bit weird.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-05 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193002268
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

I think this may be quite inefficient if we have a lot if partitions. What 
about converting the `partitionSpec` is `EqualsTo` expressions and add them as 
conditions? It would be great IMO if we can achieve this by enforcing in the 
syntax that we have either all `partitionSpec`s or all `expression`s. So if we 
have all `partition = value`, we have a `partitionSpec`, while if at least one 
is a comparison different from `=`, we have all `expression`s (including the 
`=`s). What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-05 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193000314
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
+  extractFromPartitionSpec(partition._1, table, resolver))
+  } else {
+Seq.empty[TablePartitionSpec]
+  }
 }
 
 catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge,
+  table.identifier, toDrop, ignoreIfNotExists = ifExists, purge = 
purge,
   retainData = retainData)
 
 CommandUtils.updateTableStats(sparkSession, table)
 
 Seq.empty[Row]
   }
 
+  private def extractFromPartitionSpec(
+  specs: TablePartitionSpec,
+  table: CatalogTable,
+  resolver: Resolver): Seq[Map[String, String]] = {
+Seq(PartitioningUtils.normalizePartitionSpec(
+  specs,
+  table.partitionColumnNames,
+  table.identifier.quotedString,
+  resolver))
+  }
+
+  private def extractFromPartitionFilter(
+  filters: Seq[Expression],
+  catalog: SessionCatalog,
+  table: CatalogTable,
+  resolver: Resolver): Seq[TablePartitionSpec] = {
+val expressions = filters.map { expr =>
+  val (attrName, constant) = expr match {
+case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: 
Literal) =>
+  (name, constant)
+  }
+  if (!table.partitionColumnNames.exists(resolver(_, attrName))) {
+throw new AnalysisException(s"${attrName} is not a valid partition 
column " +
+  s"in table ${table.identifier.quotedString}.")
+  }
+  val dataType = table.partitionSchema.apply(attrName).dataType
+  expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(),
+Cast(constant, dataType)))
--- End diff --

nit: can we add the cast only when needed, ie. `dataType != 
constant.dataType`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192425378
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

Sure. I will make these codes more readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192416800
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

oh I see now well this is getting very involved... can we split the 
cases in different methods? I think we can have the 4 cases like:
```
 if (partition._1.isEmpty && !partition._2.isEmpty){
  // extract from partition._2
} else if (!partition._1.isEmpty && partition._2.isEmpty) {
  // extract from partition._2
} else if (!partition._1.isEmpty && !partition._2.isEmpty) {
  // intersect
} else {
  // return empty seq
}
```

Maybe with some comments to explain when each of these cases can happen. 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192399827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

lol. you are right. I will update


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192388998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

I'm a little confusing. if we return `Seq.empty` for both cases to 
`partitionSet`. then the code will both go to line 570. 
how can we return empty for the first case to `toDrop` and return 
`partitionVal1` for the second case at this line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192382198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

I see, but what about `Cast(constant, dataType)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192380851
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

using the parsed `constant` and if we don't cast it to partition's 
dataType. it will throw an exception
>java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.math.Ordering$Long$.compare(Ordering.scala:264)
at scala.math.Ordering$class.gteq(Ordering.scala:91)
at scala.math.Ordering$Long$.gteq(Ordering.scala:264)
at 
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.nullSafeEval(predicates.scala:710)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:423)

for the case
 `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)`
`ALTER TABLE tbl_x DROP PARTITION (p >= 1)`
that I mentioned above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192367934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

yes, but in the first case `toDrop` would be empty, in the second case it 
would contain `partitionVal1`. So when it is passed later to `dropPartitions`, 
this method checks if it is empty or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192367608
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

no, that change is not needed. Why creating a new literal from the value. 
We can use the parsed literal. We don't have to change the Literal class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192147689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

let me explain these two occasions more clearly. two sqls for example(the 
useless_expression means there are no partitions for the expression):
`ALTER TABLE DROP PARTITION(partitionVal1, useless_expression)`
`ALTER TABLE DROP PARTITION(partitionVal1)`

the first sql should drop partition `partitionVal1` intersect 
`useless_expression`, and it's empty.
the second sql should drop partition `partitionVal1`

if we return `Seq.empty` to `partitonSet`  for both sqls, it will be 
impossible to tell between them later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192130248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

Ok I get your point. 
I just run a quick test. it threw a exception "java.lang.RuntimeException: 
Unsupported literal type class org.apache.spark.unsafe.types.UTF8String" at 
this line when I run:
`ALTER TABLE table_a PARTITION(a < 'test')`
so there is one line change in `literals.scala` needed.
the method `def apply(v: Any): Literal` (literals.scala: line 52) only 
support `String` not `UTF8String` for now.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192117302
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

I agree, but this case definitely doesn't need to go trough converting to 
string and creating back a string literal and casting to long. I think that the 
cast is automatically performed, or if it is not, we can just add the cast on 
the incoming constant. Do you agree?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192115083
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

I think we can't throw AnalysisException for all the situations. e.g.
`CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)`
`ALTER TABLE tbl_x DROP PARTITION (p < 1)`
In this case, the partition's dataType is `LONG` for sure. But the 
constant's dataType is `INT`

I think it's reasonable to support this situation at least.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192100229
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

sorry, I don't really get what you mean. If we have no filters we are 
returning and empty `Seq` (the check is at line 539). So here we are in the 
case 1, ie. there is a filter and it returns no partitions. If we avoid this 
if, my understanding is that we return `partitions` - which is empty - to 
`partitionSet`. Then `toDrop` also would be empty. The result is that we call 
`dropPartitions` with an empty Seq and it will throw the `AnalysisException` 
(instead of doing it here). So I think this is useless. Am I wrong?

PS all these operations are becoming quite complex as inline statements. I 
think that creating some methods for handling the different parts could improve 
readability. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192095829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

Shouldn't we throw an `AnalysisException` if they have different datatype? 
I think converting something to string and back to the desired datatype is not 
a good approach and it may cause issues.
@gatorsmile  what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192095262
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map without optional values
+   * and a partition filter specification.
+   */
+  protected def visitPartition(
--- End diff --

I see what you mean now. Yes, I have no better idea indeed. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191823358
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map without optional values
+   * and a partition filter specification.
+   */
+  protected def visitPartition(
--- End diff --

I tried to add a new parameter to `AlterTableDropPartitionCommand` earlier, 
but it was kind hard. 
thinking about a sql below:

`DROP PARTITION(partitionVal1, expression1), PARTITION(partitionVal2, 
expression2)`

all of the partitions need to be dropped are:
(`partitionVal1` intersect `expression1`) union 
(`partitionVal2` intersect `expression2`)

using one tuple is to telling us that the `partitionVal1` and `expression1` 
are from the same `partitionSpec` and we should use `intersect`.
Also, different tuples means (`partitionVal1 intersect expression1`) and 
(`partitionVal2 intersect expression2`) are from different `partitionSpec` and 
we should use `union`.

if we don't use tuple, it's would be difficult to tell the different 
occasions and it would be difficult to decide between `intersect` and `union` 
when `partitionVal1` meet `expression1`/`expression2`

Any ideas to replace this `tuple`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191805297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

constant's dataType may be different with the partition's dataType.
the difference may cause problems for the expression to compare them later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191795550
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

there are two occasions if we get a empty seq here from expression filters 
in one `DROP PARTITION` sql.
1. there is at least one filter but there is no partitions for the filter.
2. there is no filters

if we don't add this check, this may be confusing later.
because in the first occasion, we should use the `intersect` with 
`normalizedPartitionSpec`
but in the second occasion, we shouldn't use `intersect` because that will 
return a empty result.

add this check and we can treat them with different ways
1. regardless of `normalizedPartitionSpec` and throw an exception directly
2. return `Seq.empty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-29 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191492537
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] 
= withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp
+case bc @ BinaryComparison(constant: Literal, _) =>
+  throw new ParseException("Literal " + constant
--- End diff --

Sorry,  I was careless. Will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191206009
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map without optional values
+   * and a partition filter specification.
+   */
+  protected def visitPartition(
--- End diff --

can we avoid this method? I find it quite confusing (I mean it is a bit 
weird to return a tuple with a Map and a Seq of different things) We can 
add a new parameter to `AlterTableDropPartitionCommand` and use the other two 
method directly...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191206680
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

why do we need this check here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191205128
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] 
= withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp
+case bc @ BinaryComparison(constant: Literal, _) =>
+  throw new ParseException("Literal " + constant
++ " is supported only on the rigth-side.", ctx)
+case _ =>
+  throw new ParseException("Invalid partition filter 
specification", ctx)
--- End diff --

it would be useful to output to the user which expression was invalid and wh


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191207036
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

why do we need to cast a new `Literal`? can't we just use `constant`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-28 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191204888
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] 
= withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp
+case bc @ BinaryComparison(constant: Literal, _) =>
+  throw new ParseException("Literal " + constant
--- End diff --

nit: use `s""` and this can be a 1-line statement


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190265826
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
+case _ =>
+  throw new ParseException("Invalid partition filter 
specification", ctx)
+  }
+}
+if(parts.isEmpty) {
--- End diff --

you're right. I will change this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190221738
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -495,6 +496,150 @@ class HiveDDLSuite
 }
   }
 
+  def testDropPartition(dataType: DataType, value: Any): Unit = {
--- End diff --

Ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190205589
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand(
 val deletedPartitions = initialMatchingPartitions.toSet -- 
updatedPartitions
 if (deletedPartitions.nonEmpty) {
   AlterTableDropPartitionCommand(
-catalogTable.get.identifier, deletedPartitions.toSeq,
+catalogTable.get.identifier, deletedPartitions.map(x => 
(x, Seq())).toSeq,
--- End diff --

`Seq.empty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190204871
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -495,6 +496,150 @@ class HiveDDLSuite
 }
   }
 
+  def testDropPartition(dataType: DataType, value: Any): Unit = {
--- End diff --

can we add a similar test for checking that we are not dropping the 
partitions when the condition is not met?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190205608
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
 ---
@@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with 
SharedSQLContext {
 val expected1_table = AlterTableDropPartitionCommand(
   tableIdent,
   Seq(
-Map("dt" -> "2008-08-08", "country" -> "us"),
-Map("dt" -> "2009-09-09", "country" -> "uk")),
+(Map("dt" -> "2008-08-08", "country" -> "us"), Seq()),
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190203697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
+case _ =>
+  throw new ParseException("Invalid partition filter 
specification", ctx)
+  }
+}
+if(parts.isEmpty) {
--- End diff --

why aren't we returning `parts`? this if seems pretty useless


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-16 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r181860148
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
--- End diff --

OK. I'll work on this these days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-16 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r181671014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Expression)],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2 != null) {
+  partition._2.references.foreach { attr =>
+if (!table.partitionColumnNames.exists(resolver(_, 
attr.name))) {
+  throw new AnalysisException(s"${attr.name} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+  }
+val partitions = catalog.listPartitionsByFilter(
+  table.identifier, Seq(partition._2)).map(_.spec)
+if (partitions.isEmpty && !ifExists) {
+  throw new AnalysisException(s"There is no partition for 
${partition._2.sql}")
+}
+partitions
+} else {
+  Seq.empty[TablePartitionSpec]
+}
+  }.distinct
+
+  if (normalizedSpecs.isEmpty && partitionSet.isEmpty) {
--- End diff --

I tried this command in hive. And hive only dropped the intersection of two 
partition filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179944192
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
--- End diff --

we can also enforce this is the syntax, like here: 
https://github.com/apache/spark/pull/20999/files#diff-8c1cb2af4aa1109e08481dae79052cc3R269


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179941575
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
--- End diff --

If we support the right-side only, it seems be useful to print explicit 
error messages like `left-side literal not supported `?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179941480
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
--- End diff --

What if the partition column is not of `String` type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179941447
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
--- End diff --

Either way, we might need tests for non int-literal cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179941409
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
--- End diff --

Is it ok to pass all the type of literals here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179940757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Expression)],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2 != null) {
+  partition._2.references.foreach { attr =>
+if (!table.partitionColumnNames.exists(resolver(_, 
attr.name))) {
+  throw new AnalysisException(s"${attr.name} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+  }
+val partitions = catalog.listPartitionsByFilter(
+  table.identifier, Seq(partition._2)).map(_.spec)
+if (partitions.isEmpty && !ifExists) {
+  throw new AnalysisException(s"There is no partition for 
${partition._2.sql}")
+}
+partitions
+} else {
+  Seq.empty[TablePartitionSpec]
+}
+  }.distinct
+
+  if (normalizedSpecs.isEmpty && partitionSet.isEmpty) {
--- End diff --

can,t we just return `partitionSet  ++ normalizedSpecs `? I think it is 
wrong to use `intersect`, we should drop all of them, shouldn't we?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179940633
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
+case _ =>
+  throw new ParseException("Invalid partition filter 
specification", ctx)
+  }
+}
+if(parts.isEmpty) {
--- End diff --

wouldn't be better to return the `Seq[Expression]` as it is? Later we need 
it like that (in `listPartitionsByFilter `) and in this way we can avoid using 
`null` which is a good thing too...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179940472
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
--- End diff --

Hive supports them only on the right side. So it makes sense to have the 
same here I think.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r179934313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
--- End diff --

Still the same question here. Constant has to be in the right side?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2017-11-07 Thread DazhuangSu
GitHub user DazhuangSu opened a pull request:

https://github.com/apache/spark/pull/19691

[SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support 
comparators

## What changes were proposed in this pull request?

This pr is inspired by @dongjoon-hyun.

quote from https://github.com/apache/spark/pull/15704 :

> **What changes were proposed in this pull request?**
This PR aims to support comparators, e.g. '<', '<=', '>', '>=', again 
in Apache Spark 2.0 for backward compatibility.
**Spark 1.6**
`scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country 
STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = [result: string]`
`scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")
res1: org.apache.spark.sql.DataFrame = [result: string]`
**Spark 2.0**
`scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country 
STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = []`
`scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")`
`org.apache.spark.sql.catalyst.parser.ParseException:`
`mismatched input '<' expecting {')', ','}(line 1, pos 42)`
After this PR, it's supported.
**How was this patch tested?**
Pass the Jenkins test with a newly added testcase.


https://github.com/apache/spark/pull/16036 points out that if we use int 
literal in DROP PARTITION will fail after patching 
https://github.com/apache/spark/pull/15704.
The reason of this failing in https://github.com/apache/spark/pull/15704 is 
that AlterTableDropPartitionCommand tells BinayComparison and EqualTo with 
following code:

`private def isRangeComparison(expr: Expression): Boolean = {
 `
`expr.find(e => e.isInstanceOf[BinaryComparison] && 
!e.isInstanceOf[EqualTo]).isDefined
}`

This PR resolve this problem by telling a drop condition when parsing sqls.

## How was this patch tested?
New testcase introduced from https://github.com/apache/spark/pull/15704


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DazhuangSu/spark SPARK-17732

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19691


commit 20f658ad8e14a94dd23bff6a8d795124d1db24e9
Author: Dylan Su 
Date:   2017-11-08T03:44:28Z

[SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support 
comparators




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org