[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @maropu Sorry. I don't really have much time this month. I can close this pr and somebody can continue on this problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 Sorry guys. little busy recently. I will resolve the failed tests this weekend first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @maropu ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r205013855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- @mgaido91 I understand your point, yes it would be inefficient. I will work on this soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193691275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- hi, @mgaido91 there is one problem after I changed the syntax, when i run sql `DROP PARTITION (p >=2)` it throws `org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input columns: []` I'm trying to find a way to figure it out. By the way, is a syntax like `((partitionVal (',' partitionVal)*) | (expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, but it didn't work as I supposed. Besides, I was wrong that day. I think the if conditions won't be inefficient if there is a lot of partitions. it maybe inefficient if there are a lot of dropPartitionSpec which I don't think can happen easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193356194 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- I mean how to define `AlterTableDropPartitionCommand` better in `ddl.scala`. need to handle both `AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean)` and `AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[TablePartitionSpec], ifExists: Boolean, purge: Boolean, retainData: Boolean)` Maybe telling the different cases inside the method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193171992 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- Yeah, I agree. And the hard part may be how to convert a `partitionSpec` to an `EqualsTo`. I think it's better to let the `AstBuilder` to handle this. If so, we may have to have two `AlterTableDropPartitionCommand` instances in `ddl.scala`, one for all `partitionSpec` and one for all `expression`. But it maybe a bit weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192425378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- Sure. I will make these codes more readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192399827 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- lol. you are right. I will update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192388998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- I'm a little confusing. if we return `Seq.empty` for both cases to `partitionSet`. then the code will both go to line 570. how can we return empty for the first case to `toDrop` and return `partitionVal1` for the second case at this line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192380851 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- using the parsed `constant` and if we don't cast it to partition's dataType. it will throw an exception >java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105) at scala.math.Ordering$Long$.compare(Ordering.scala:264) at scala.math.Ordering$class.gteq(Ordering.scala:91) at scala.math.Ordering$Long$.gteq(Ordering.scala:264) at org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.nullSafeEval(predicates.scala:710) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:423) for the case `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)` `ALTER TABLE tbl_x DROP PARTITION (p >= 1)` that I mentioned above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192147689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- let me explain these two occasions more clearly. two sqls for example(the useless_expression means there are no partitions for the expression): `ALTER TABLE DROP PARTITION(partitionVal1, useless_expression)` `ALTER TABLE DROP PARTITION(partitionVal1)` the first sql should drop partition `partitionVal1` intersect `useless_expression`, and it's empty. the second sql should drop partition `partitionVal1` if we return `Seq.empty` to `partitonSet` for both sqls, it will be impossible to tell between them later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192130248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- Ok I get your point. I just run a quick test. it threw a exception "java.lang.RuntimeException: Unsupported literal type class org.apache.spark.unsafe.types.UTF8String" at this line when I run: `ALTER TABLE table_a PARTITION(a < 'test')` so there is one line change in `literals.scala` needed. the method `def apply(v: Any): Literal` (literals.scala: line 52) only support `String` not `UTF8String` for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192115083 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- I think we can't throw AnalysisException for all the situations. e.g. `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)` `ALTER TABLE tbl_x DROP PARTITION (p < 1)` In this case, the partition's dataType is `LONG` for sure. But the constant's dataType is `INT` I think it's reasonable to support this situation at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191823358 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map without optional values + * and a partition filter specification. + */ + protected def visitPartition( --- End diff -- I tried to add a new parameter to `AlterTableDropPartitionCommand` earlier, but it was kind hard. thinking about a sql below: `DROP PARTITION(partitionVal1, expression1), PARTITION(partitionVal2, expression2)` all of the partitions need to be dropped are: (`partitionVal1` intersect `expression1`) union (`partitionVal2` intersect `expression2`) using one tuple is to telling us that the `partitionVal1` and `expression1` are from the same `partitionSpec` and we should use `intersect`. Also, different tuples means (`partitionVal1 intersect expression1`) and (`partitionVal2 intersect expression2`) are from different `partitionSpec` and we should use `union`. if we don't use tuple, it's would be difficult to tell the different occasions and it would be difficult to decide between `intersect` and `union` when `partitionVal1` meet `expression1`/`expression2` Any ideas to replace this `tuple`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191805297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- constant's dataType may be different with the partition's dataType. the difference may cause problems for the expression to compare them later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191795550 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- there are two occasions if we get a empty seq here from expression filters in one `DROP PARTITION` sql. 1. there is at least one filter but there is no partitions for the filter. 2. there is no filters if we don't add this check, this may be confusing later. because in the first occasion, we should use the `intersect` with `normalizedPartitionSpec` but in the second occasion, we shouldn't use `intersect` because that will return a empty result. add this check and we can treat them with different ways 1. regardless of `normalizedPartitionSpec` and throw an exception directly 2. return `Seq.empty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191492537 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp +case bc @ BinaryComparison(constant: Literal, _) => + throw new ParseException("Literal " + constant --- End diff -- Sorry, I was careless. Will fix this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @mgaido91 It's totally ok to do it in parser. I was thinking it's may need to change more codes in SqlBase than now and it's only for this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190265826 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) +case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } +} +if(parts.isEmpty) { --- End diff -- you're right. I will change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @mgaido91 Will it be ok to support Literal only on the right-side like this? https://github.com/apache/spark/pull/19691/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R295 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190221738 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -495,6 +496,150 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value: Any): Unit = { --- End diff -- Ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @mgaido91 What's you opinion about this review by now? https://github.com/apache/spark/pull/19691/files#r179940757 I think it's more reasonable to use `intersect`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @maropu @mgaido91 path has passed all tests. Any advice please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @mgaido91 Sorry, a little busy recently. pr is almost ready. Will update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r181860148 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) --- End diff -- OK. I'll work on this these days. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r181671014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Expression)], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2 != null) { + partition._2.references.foreach { attr => +if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} + } +val partitions = catalog.listPartitionsByFilter( + table.identifier, Seq(partition._2)).map(_.spec) +if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${partition._2.sql}") +} +partitions +} else { + Seq.empty[TablePartitionSpec] +} + }.distinct + + if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { --- End diff -- I tried this command in hive. And hive only dropped the intersection of two partition filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 @gatorsmile @dongjoon-hyun Could you give me some advice please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user DazhuangSu commented on the issue: https://github.com/apache/spark/pull/19691 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
GitHub user DazhuangSu opened a pull request: https://github.com/apache/spark/pull/19691 [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support comparators ## What changes were proposed in this pull request? This pr is inspired by @dongjoon-hyun. quote from https://github.com/apache/spark/pull/15704 : > **What changes were proposed in this pull request?** This PR aims to support comparators, e.g. '<', '<=', '>', '>=', again in Apache Spark 2.0 for backward compatibility. **Spark 1.6** `scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [result: string]` `scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") res1: org.apache.spark.sql.DataFrame = [result: string]` **Spark 2.0** `scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = []` `scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")` `org.apache.spark.sql.catalyst.parser.ParseException:` `mismatched input '<' expecting {')', ','}(line 1, pos 42)` After this PR, it's supported. **How was this patch tested?** Pass the Jenkins test with a newly added testcase. https://github.com/apache/spark/pull/16036 points out that if we use int literal in DROP PARTITION will fail after patching https://github.com/apache/spark/pull/15704. The reason of this failing in https://github.com/apache/spark/pull/15704 is that AlterTableDropPartitionCommand tells BinayComparison and EqualTo with following code: `private def isRangeComparison(expr: Expression): Boolean = {⨠` `expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefinedâ¨}` This PR resolve this problem by telling a drop condition when parsing sqls. ## How was this patch tested? New testcase introduced from https://github.com/apache/spark/pull/15704 You can merge this pull request into a Git repository by running: $ git pull https://github.com/DazhuangSu/spark SPARK-17732 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19691.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19691 commit 20f658ad8e14a94dd23bff6a8d795124d1db24e9 Author: Dylan Su Date: 2017-11-08T03:44:28Z [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support comparators --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15704: [SPARK-17732][SQL] ALTER TABLE DROP PARTITION sho...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/15704#discussion_r132893104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +specs: Seq[Expression], ifExists: Boolean, purge: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { + + private def isRangeComparison(expr: Expression): Boolean = { +expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined + } override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +specs.foreach { expr => + expr.references.foreach { attr => +if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} + } } -catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) +if (specs.exists(isRangeComparison)) { + val partitionSet = specs.flatMap { spec => +val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec) +if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${spec.sql}") +} +partitions + }.distinct + catalog.dropPartitions( +table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge) +} else { + val normalizedSpecs = specs.map { expr => +val spec = splitConjunctivePredicates(expr).map { + case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + resolver) + } + catalog.dropPartitions( +table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) +} --- End diff -- The function named isRangeComparison treats BinaryComparison and EqualTo as 2 different scenarios. For BinaryComparison, it lists partitions by filter which are defined by expressions. But for EqualTo, it gets TablePartitionSpec through PartitioningUtils.normalizePartitionSpec using spec(Map[String, String]). Why donât we just treat EqualTo as a special case for BinaryComparison and let catalog.listPartitionsByFilter do the work to get TablePartitionSpec. In my test, this can solve the problem when using int literal after patching this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org