[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-09-05 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@maropu  
Sorry. I don't really have much time this month. 
I can close this  pr and somebody can continue on this problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-08-30 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
Sorry guys. little busy recently. 
I will resolve the failed tests this weekend first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-08-23 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@maropu  ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-07-25 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r205013855
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

@mgaido91 I understand your point, yes it would be inefficient. I will work 
on this soon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-07 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193691275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

hi, @mgaido91 there is one problem after I changed the syntax,
when i run sql `DROP PARTITION (p >=2)` it throws
`org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input 
columns: []`
I'm trying to find a way to figure it out.

By the way, is a syntax like `((partitionVal (',' partitionVal)*) | 
(expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, 
but it didn't work as I supposed.

Besides, I was wrong that day. I think the if conditions won't be 
inefficient if there is a lot of partitions. it maybe inefficient if there are 
a lot of dropPartitionSpec which I don't think can happen easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-06 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193356194
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

I mean how to define `AlterTableDropPartitionCommand` better in 
`ddl.scala`. need to handle both
`AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partitions: Seq[Seq[Expression]],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)`
and
`AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partitions: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)`
Maybe telling the different cases inside the method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-05 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193171992
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

Yeah, I agree. And the hard part may be how to convert a `partitionSpec` to 
an `EqualsTo`. 
I think it's better to let the `AstBuilder` to handle this. If so, we may 
have to have two `AlterTableDropPartitionCommand` instances in `ddl.scala`, one 
for all `partitionSpec` and one for all `expression`. But it maybe a bit weird.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-06-05 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192425378
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

Sure. I will make these codes more readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192399827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

lol. you are right. I will update


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192388998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

I'm a little confusing. if we return `Seq.empty` for both cases to 
`partitionSet`. then the code will both go to line 570. 
how can we return empty for the first case to `toDrop` and return 
`partitionVal1` for the second case at this line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-01 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192380851
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

using the parsed `constant` and if we don't cast it to partition's 
dataType. it will throw an exception
>java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.math.Ordering$Long$.compare(Ordering.scala:264)
at scala.math.Ordering$class.gteq(Ordering.scala:91)
at scala.math.Ordering$Long$.gteq(Ordering.scala:264)
at 
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.nullSafeEval(predicates.scala:710)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:423)

for the case
 `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)`
`ALTER TABLE tbl_x DROP PARTITION (p >= 1)`
that I mentioned above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192147689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

let me explain these two occasions more clearly. two sqls for example(the 
useless_expression means there are no partitions for the expression):
`ALTER TABLE DROP PARTITION(partitionVal1, useless_expression)`
`ALTER TABLE DROP PARTITION(partitionVal1)`

the first sql should drop partition `partitionVal1` intersect 
`useless_expression`, and it's empty.
the second sql should drop partition `partitionVal1`

if we return `Seq.empty` to `partitonSet`  for both sqls, it will be 
impossible to tell between them later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192130248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

Ok I get your point. 
I just run a quick test. it threw a exception "java.lang.RuntimeException: 
Unsupported literal type class org.apache.spark.unsafe.types.UTF8String" at 
this line when I run:
`ALTER TABLE table_a PARTITION(a < 'test')`
so there is one line change in `literals.scala` needed.
the method `def apply(v: Any): Literal` (literals.scala: line 52) only 
support `String` not `UTF8String` for now.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-31 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r192115083
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

I think we can't throw AnalysisException for all the situations. e.g.
`CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)`
`ALTER TABLE tbl_x DROP PARTITION (p < 1)`
In this case, the partition's dataType is `LONG` for sure. But the 
constant's dataType is `INT`

I think it's reasonable to support this situation at least.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191823358
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map without optional values
+   * and a partition filter specification.
+   */
+  protected def visitPartition(
--- End diff --

I tried to add a new parameter to `AlterTableDropPartitionCommand` earlier, 
but it was kind hard. 
thinking about a sql below:

`DROP PARTITION(partitionVal1, expression1), PARTITION(partitionVal2, 
expression2)`

all of the partitions need to be dropped are:
(`partitionVal1` intersect `expression1`) union 
(`partitionVal2` intersect `expression2`)

using one tuple is to telling us that the `partitionVal1` and `expression1` 
are from the same `partitionSpec` and we should use `intersect`.
Also, different tuples means (`partitionVal1 intersect expression1`) and 
(`partitionVal2 intersect expression2`) are from different `partitionSpec` and 
we should use `union`.

if we don't use tuple, it's would be difficult to tell the different 
occasions and it would be difficult to decide between `intersect` and `union` 
when `partitionVal1` meet `expression1`/`expression2`

Any ideas to replace this `tuple`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191805297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
--- End diff --

constant's dataType may be different with the partition's dataType.
the difference may cause problems for the expression to compare them later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-30 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191795550
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2.nonEmpty) {
+  val parts = partition._2.map { expr =>
+val (attrName, value) = expr match {
+  case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+(name, constant.value)
+}
+if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
+  throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+val dataType = table.partitionSchema.apply(attrName).dataType
+expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
+  Cast(Literal(value.toString), dataType)))
+  }.reduce(And)
+
+  val partitions = catalog.listPartitionsByFilter(
+table.identifier, Seq(parts)).map(_.spec)
+  if (partitions.isEmpty && !ifExists) {
--- End diff --

there are two occasions if we get a empty seq here from expression filters 
in one `DROP PARTITION` sql.
1. there is at least one filter but there is no partitions for the filter.
2. there is no filters

if we don't add this check, this may be confusing later.
because in the first occasion, we should use the `intersect` with 
`normalizedPartitionSpec`
but in the second occasion, we shouldn't use `intersect` because that will 
return a empty result.

add this check and we can treat them with different ways
1. regardless of `normalizedPartitionSpec` and throw an exception directly
2. return `Seq.empty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-29 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r191492537
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] 
= withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp
+case bc @ BinaryComparison(constant: Literal, _) =>
+  throw new ParseException("Literal " + constant
--- End diff --

Sorry,  I was careless. Will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@mgaido91 It's totally ok to do it in parser. 
I was thinking it's may need to change more codes in SqlBase than now and 
it's only for this case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190265826
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
+case _ =>
+  throw new ParseException("Invalid partition filter 
specification", ctx)
+  }
+}
+if(parts.isEmpty) {
--- End diff --

you're right. I will change this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@mgaido91 Will it be ok to support Literal only on the right-side like this?

https://github.com/apache/spark/pull/19691/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R295


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-05-23 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r190221738
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -495,6 +496,150 @@ class HiveDDLSuite
 }
   }
 
+  def testDropPartition(dataType: DataType, value: Any): Unit = {
--- End diff --

Ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-05-22 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@mgaido91 
What's you opinion about this review by now?
https://github.com/apache/spark/pull/19691/files#r179940757
I think it's more reasonable to use `intersect`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-05-21 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@maropu @mgaido91 
path has passed all tests. Any advice please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-05-20 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@mgaido91  Sorry, a little busy recently. 
pr is almost ready. Will update soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-16 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r181860148
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 parts.toMap
   }
 
+  /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = 
withOrigin(ctx) {
+val parts = ctx.expression.asScala.map { pVal =>
+  expression(pVal) match {
+case EqualNullSafe(_, _) =>
+  throw new ParseException("'<=>' operator is not allowed in 
partition specification.", ctx)
+case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
+  cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), 
constant))
--- End diff --

OK. I'll work on this these days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-04-16 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r181671014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Expression)],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+partition._1,
+table.partitionColumnNames,
+table.identifier.quotedString,
+sparkSession.sessionState.conf.resolver)
+
+  val partitionSet = {
+if (partition._2 != null) {
+  partition._2.references.foreach { attr =>
+if (!table.partitionColumnNames.exists(resolver(_, 
attr.name))) {
+  throw new AnalysisException(s"${attr.name} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+  }
+val partitions = catalog.listPartitionsByFilter(
+  table.identifier, Seq(partition._2)).map(_.spec)
+if (partitions.isEmpty && !ifExists) {
+  throw new AnalysisException(s"There is no partition for 
${partition._2.sql}")
+}
+partitions
+} else {
+  Seq.empty[TablePartitionSpec]
+}
+  }.distinct
+
+  if (normalizedSpecs.isEmpty && partitionSet.isEmpty) {
--- End diff --

I tried this command in hive. And hive only dropped the intersection of two 
partition filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2017-11-16 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@gatorsmile @dongjoon-hyun 
Could you give me some advice please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2017-11-14 Thread DazhuangSu
Github user DazhuangSu commented on the issue:

https://github.com/apache/spark/pull/19691
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2017-11-07 Thread DazhuangSu
GitHub user DazhuangSu opened a pull request:

https://github.com/apache/spark/pull/19691

[SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support 
comparators

## What changes were proposed in this pull request?

This pr is inspired by @dongjoon-hyun.

quote from https://github.com/apache/spark/pull/15704 :

> **What changes were proposed in this pull request?**
This PR aims to support comparators, e.g. '<', '<=', '>', '>=', again 
in Apache Spark 2.0 for backward compatibility.
**Spark 1.6**
`scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country 
STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = [result: string]`
`scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")
res1: org.apache.spark.sql.DataFrame = [result: string]`
**Spark 2.0**
`scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country 
STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = []`
`scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")`
`org.apache.spark.sql.catalyst.parser.ParseException:`
`mismatched input '<' expecting {')', ','}(line 1, pos 42)`
After this PR, it's supported.
**How was this patch tested?**
Pass the Jenkins test with a newly added testcase.


https://github.com/apache/spark/pull/16036 points out that if we use int 
literal in DROP PARTITION will fail after patching 
https://github.com/apache/spark/pull/15704.
The reason of this failing in https://github.com/apache/spark/pull/15704 is 
that AlterTableDropPartitionCommand tells BinayComparison and EqualTo with 
following code:

`private def isRangeComparison(expr: Expression): Boolean = {
 `
`expr.find(e => e.isInstanceOf[BinaryComparison] && 
!e.isInstanceOf[EqualTo]).isDefined
}`

This PR resolve this problem by telling a drop condition when parsing sqls.

## How was this patch tested?
New testcase introduced from https://github.com/apache/spark/pull/15704


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DazhuangSu/spark SPARK-17732

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19691


commit 20f658ad8e14a94dd23bff6a8d795124d1db24e9
Author: Dylan Su 
Date:   2017-11-08T03:44:28Z

[SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support 
comparators




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15704: [SPARK-17732][SQL] ALTER TABLE DROP PARTITION sho...

2017-08-14 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15704#discussion_r132893104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+specs: Seq[Expression],
 ifExists: Boolean,
 purge: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
+
+  private def isRangeComparison(expr: Expression): Boolean = {
+expr.find(e => e.isInstanceOf[BinaryComparison] && 
!e.isInstanceOf[EqualTo]).isDefined
+  }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+specs.foreach { expr =>
+  expr.references.foreach { attr =>
+if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
+  throw new AnalysisException(s"${attr.name} is not a valid 
partition column " +
+s"in table ${table.identifier.quotedString}.")
+}
+  }
 }
 
-catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge)
+if (specs.exists(isRangeComparison)) {
+  val partitionSet = specs.flatMap { spec =>
+val partitions = catalog.listPartitionsByFilter(table.identifier, 
Seq(spec)).map(_.spec)
+if (partitions.isEmpty && !ifExists) {
+  throw new AnalysisException(s"There is no partition for 
${spec.sql}")
+}
+partitions
+  }.distinct
+  catalog.dropPartitions(
+table.identifier, partitionSet, ignoreIfNotExists = ifExists, 
purge = purge)
+} else {
+  val normalizedSpecs = specs.map { expr =>
+val spec = splitConjunctivePredicates(expr).map {
+  case BinaryComparison(AttributeReference(name, _, _, _), right) 
=> name -> right.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  spec,
+  table.partitionColumnNames,
+  table.identifier.quotedString,
+  resolver)
+  }
+  catalog.dropPartitions(
+table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge)
+}
--- End diff --

The function named isRangeComparison treats BinaryComparison and EqualTo as 
2 different scenarios. For BinaryComparison,  it lists partitions by filter 
which are defined by expressions. But for EqualTo, it gets TablePartitionSpec 
through PartitioningUtils.normalizePartitionSpec using spec(Map[String, 
String]).
Why don’t we just treat EqualTo as a special case for BinaryComparison 
and let catalog.listPartitionsByFilter do the work to get TablePartitionSpec. 
In my test, this can solve the problem when using int literal after 
patching this pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org