[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19691 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r205047473 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- thank you @DazhuangSu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r205013855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- @mgaido91 I understand your point, yes it would be inefficient. I will work on this soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r202637455 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- @DazhuangSu sorry I missed your last comment somehow. Why do you say it would not be inefficient if you have a lot of partitions?I think it would be! Imagine that you partition per year and day. And you want to get the first 6 months of this year. The spec would be something like `(year = 2018, day < 2018-07-01)`. Imagine we have a 10 years history. With the current implementation, we would get back basically all the the partitions from the filter, ie. roughly 3.650 and then it will intersect those. Anyway, my understanding is that such a case would not even work properly, as it would try drop the intersect of: ``` Seq(Seq("year"-> "2018", "day" -> "2018-01-01", ...)).intersect(Seq(Map("year"->"2018"))) ``` which would result in an empty Seq, so we would drop nothing. Moreover, I saw no test for this case in the tests. Can we add tests for this use case and can we add support for it if my understanding that it is not working is right? Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193691275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- hi, @mgaido91 there is one problem after I changed the syntax, when i run sql `DROP PARTITION (p >=2)` it throws `org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input columns: []` I'm trying to find a way to figure it out. By the way, is a syntax like `((partitionVal (',' partitionVal)*) | (expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, but it didn't work as I supposed. Besides, I was wrong that day. I think the if conditions won't be inefficient if there is a lot of partitions. it maybe inefficient if there are a lot of dropPartitionSpec which I don't think can happen easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193358172 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- I think we can (must) just have a single: `AlterTableDropPartitionCommand( tableName: TableIdentifier, partitionSpecs: Seq[TablePartitionSpec], partitionExprs: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean)`. Indeed, we might have something like: ``` alter table foo drop partition (year=2017, month=12), partition(year=2018, month < 3); ``` where we have both a partition spec and an expression specification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193356194 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- I mean how to define `AlterTableDropPartitionCommand` better in `ddl.scala`. need to handle both `AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean)` and `AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[TablePartitionSpec], ifExists: Boolean, purge: Boolean, retainData: Boolean)` Maybe telling the different cases inside the method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193347767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- why? Isn't it enough something like: ``` ((partitionVal (',' partitionVal)*) | (expression (',' expression)*)) ``` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193171992 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- Yeah, I agree. And the hard part may be how to convert a `partitionSpec` to an `EqualsTo`. I think it's better to let the `AstBuilder` to handle this. If so, we may have to have two `AlterTableDropPartitionCommand` instances in `ddl.scala`, one for all `partitionSpec` and one for all `expression`. But it maybe a bit weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193002268 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- I think this may be quite inefficient if we have a lot if partitions. What about converting the `partitionSpec` is `EqualsTo` expressions and add them as conditions? It would be great IMO if we can achieve this by enforcing in the syntax that we have either all `partitionSpec`s or all `expression`s. So if we have all `partition = value`, we have a `partitionSpec`, while if at least one is a comparison different from `=`, we have all `expression`s (including the `=`s). What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193000314 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( + extractFromPartitionSpec(partition._1, table, resolver)) + } else { +Seq.empty[TablePartitionSpec] + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, toDrop, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + private def extractFromPartitionSpec( + specs: TablePartitionSpec, + table: CatalogTable, + resolver: Resolver): Seq[Map[String, String]] = { +Seq(PartitioningUtils.normalizePartitionSpec( + specs, + table.partitionColumnNames, + table.identifier.quotedString, + resolver)) + } + + private def extractFromPartitionFilter( + filters: Seq[Expression], + catalog: SessionCatalog, + table: CatalogTable, + resolver: Resolver): Seq[TablePartitionSpec] = { +val expressions = filters.map { expr => + val (attrName, constant) = expr match { +case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + (name, constant) + } + if (!table.partitionColumnNames.exists(resolver(_, attrName))) { +throw new AnalysisException(s"${attrName} is not a valid partition column " + + s"in table ${table.identifier.quotedString}.") + } + val dataType = table.partitionSchema.apply(attrName).dataType + expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), +Cast(constant, dataType))) --- End diff -- nit: can we add the cast only when needed, ie. `dataType != constant.dataType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192425378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- Sure. I will make these codes more readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192416800 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- oh I see now well this is getting very involved... can we split the cases in different methods? I think we can have the 4 cases like: ``` if (partition._1.isEmpty && !partition._2.isEmpty){ // extract from partition._2 } else if (!partition._1.isEmpty && partition._2.isEmpty) { // extract from partition._2 } else if (!partition._1.isEmpty && !partition._2.isEmpty) { // intersect } else { // return empty seq } ``` Maybe with some comments to explain when each of these cases can happen. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192399827 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- lol. you are right. I will update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192388998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- I'm a little confusing. if we return `Seq.empty` for both cases to `partitionSet`. then the code will both go to line 570. how can we return empty for the first case to `toDrop` and return `partitionVal1` for the second case at this line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192382198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- I see, but what about `Cast(constant, dataType)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192380851 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- using the parsed `constant` and if we don't cast it to partition's dataType. it will throw an exception >java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105) at scala.math.Ordering$Long$.compare(Ordering.scala:264) at scala.math.Ordering$class.gteq(Ordering.scala:91) at scala.math.Ordering$Long$.gteq(Ordering.scala:264) at org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.nullSafeEval(predicates.scala:710) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:423) for the case `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)` `ALTER TABLE tbl_x DROP PARTITION (p >= 1)` that I mentioned above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192367934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- yes, but in the first case `toDrop` would be empty, in the second case it would contain `partitionVal1`. So when it is passed later to `dropPartitions`, this method checks if it is empty or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192367608 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- no, that change is not needed. Why creating a new literal from the value. We can use the parsed literal. We don't have to change the Literal class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192147689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- let me explain these two occasions more clearly. two sqls for example(the useless_expression means there are no partitions for the expression): `ALTER TABLE DROP PARTITION(partitionVal1, useless_expression)` `ALTER TABLE DROP PARTITION(partitionVal1)` the first sql should drop partition `partitionVal1` intersect `useless_expression`, and it's empty. the second sql should drop partition `partitionVal1` if we return `Seq.empty` to `partitonSet` for both sqls, it will be impossible to tell between them later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192130248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- Ok I get your point. I just run a quick test. it threw a exception "java.lang.RuntimeException: Unsupported literal type class org.apache.spark.unsafe.types.UTF8String" at this line when I run: `ALTER TABLE table_a PARTITION(a < 'test')` so there is one line change in `literals.scala` needed. the method `def apply(v: Any): Literal` (literals.scala: line 52) only support `String` not `UTF8String` for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192117302 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- I agree, but this case definitely doesn't need to go trough converting to string and creating back a string literal and casting to long. I think that the cast is automatically performed, or if it is not, we can just add the cast on the incoming constant. Do you agree? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192115083 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- I think we can't throw AnalysisException for all the situations. e.g. `CREATE TABLE tbl_x (a INT) PARTITIONED BY (p LONG)` `ALTER TABLE tbl_x DROP PARTITION (p < 1)` In this case, the partition's dataType is `LONG` for sure. But the constant's dataType is `INT` I think it's reasonable to support this situation at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192100229 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- sorry, I don't really get what you mean. If we have no filters we are returning and empty `Seq` (the check is at line 539). So here we are in the case 1, ie. there is a filter and it returns no partitions. If we avoid this if, my understanding is that we return `partitions` - which is empty - to `partitionSet`. Then `toDrop` also would be empty. The result is that we call `dropPartitions` with an empty Seq and it will throw the `AnalysisException` (instead of doing it here). So I think this is useless. Am I wrong? PS all these operations are becoming quite complex as inline statements. I think that creating some methods for handling the different parts could improve readability. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192095829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- Shouldn't we throw an `AnalysisException` if they have different datatype? I think converting something to string and back to the desired datatype is not a good approach and it may cause issues. @gatorsmile what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r192095262 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map without optional values + * and a partition filter specification. + */ + protected def visitPartition( --- End diff -- I see what you mean now. Yes, I have no better idea indeed. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191823358 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map without optional values + * and a partition filter specification. + */ + protected def visitPartition( --- End diff -- I tried to add a new parameter to `AlterTableDropPartitionCommand` earlier, but it was kind hard. thinking about a sql below: `DROP PARTITION(partitionVal1, expression1), PARTITION(partitionVal2, expression2)` all of the partitions need to be dropped are: (`partitionVal1` intersect `expression1`) union (`partitionVal2` intersect `expression2`) using one tuple is to telling us that the `partitionVal1` and `expression1` are from the same `partitionSpec` and we should use `intersect`. Also, different tuples means (`partitionVal1 intersect expression1`) and (`partitionVal2 intersect expression2`) are from different `partitionSpec` and we should use `union`. if we don't use tuple, it's would be difficult to tell the different occasions and it would be difficult to decide between `intersect` and `union` when `partitionVal1` meet `expression1`/`expression2` Any ideas to replace this `tuple`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191805297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- constant's dataType may be different with the partition's dataType. the difference may cause problems for the expression to compare them later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191795550 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- there are two occasions if we get a empty seq here from expression filters in one `DROP PARTITION` sql. 1. there is at least one filter but there is no partitions for the filter. 2. there is no filters if we don't add this check, this may be confusing later. because in the first occasion, we should use the `intersect` with `normalizedPartitionSpec` but in the second occasion, we shouldn't use `intersect` because that will return a empty result. add this check and we can treat them with different ways 1. regardless of `normalizedPartitionSpec` and throw an exception directly 2. return `Seq.empty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191492537 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp +case bc @ BinaryComparison(constant: Literal, _) => + throw new ParseException("Literal " + constant --- End diff -- Sorry, I was careless. Will fix this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191206009 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map without optional values + * and a partition filter specification. + */ + protected def visitPartition( --- End diff -- can we avoid this method? I find it quite confusing (I mean it is a bit weird to return a tuple with a Map and a Seq of different things) We can add a new parameter to `AlterTableDropPartitionCommand` and use the other two method directly... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191206680 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) + }.reduce(And) + + val partitions = catalog.listPartitionsByFilter( +table.identifier, Seq(parts)).map(_.spec) + if (partitions.isEmpty && !ifExists) { --- End diff -- why do we need this check here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191205128 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp +case bc @ BinaryComparison(constant: Literal, _) => + throw new ParseException("Literal " + constant ++ " is supported only on the rigth-side.", ctx) +case _ => + throw new ParseException("Invalid partition filter specification", ctx) --- End diff -- it would be useful to output to the user which expression was invalid and wh --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191204888 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp +case bc @ BinaryComparison(constant: Literal, _) => + throw new ParseException("Literal " + constant --- End diff -- nit: use `s""` and this can be a 1-line statement --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r191207036 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2.nonEmpty) { + val parts = partition._2.map { expr => +val (attrName, value) = expr match { + case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => +(name, constant.value) +} +if (!table.partitionColumnNames.exists(resolver(_, attrName))) { + throw new AnalysisException(s"${attrName} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} +val dataType = table.partitionSchema.apply(attrName).dataType +expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(), + Cast(Literal(value.toString), dataType))) --- End diff -- why do we need to cast a new `Literal`? can't we just use `constant`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190265826 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) +case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } +} +if(parts.isEmpty) { --- End diff -- you're right. I will change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190221738 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -495,6 +496,150 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value: Any): Unit = { --- End diff -- Ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190204871 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -495,6 +496,150 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value: Any): Unit = { --- End diff -- can we add a similar test for checking that we are not dropping the partitions when the condition is not met? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190205589 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( -catalogTable.get.identifier, deletedPartitions.toSeq, +catalogTable.get.identifier, deletedPartitions.map(x => (x, Seq())).toSeq, --- End diff -- `Seq.empty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190205608 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala --- @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( -Map("dt" -> "2008-08-08", "country" -> "us"), -Map("dt" -> "2009-09-09", "country" -> "uk")), +(Map("dt" -> "2008-08-08", "country" -> "us"), Seq()), --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r190203697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) +case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } +} +if(parts.isEmpty) { --- End diff -- why aren't we returning `parts`? this if seems pretty useless --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r181860148 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) --- End diff -- OK. I'll work on this these days. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r181671014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Expression)], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2 != null) { + partition._2.references.foreach { attr => +if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} + } +val partitions = catalog.listPartitionsByFilter( + table.identifier, Seq(partition._2)).map(_.spec) +if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${partition._2.sql}") +} +partitions +} else { + Seq.empty[TablePartitionSpec] +} + }.distinct + + if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { --- End diff -- I tried this command in hive. And hive only dropped the intersection of two partition filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179944192 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => --- End diff -- we can also enforce this is the syntax, like here: https://github.com/apache/spark/pull/20999/files#diff-8c1cb2af4aa1109e08481dae79052cc3R269 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179941575 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => --- End diff -- If we support the right-side only, it seems be useful to print explicit error messages like `left-side literal not supported `? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179941480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) --- End diff -- What if the partition column is not of `String` type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179941447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) --- End diff -- Either way, we might need tests for non int-literal cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179941409 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) --- End diff -- Is it ok to pass all the type of literals here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179940757 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Expression)], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( +partition._1, +table.partitionColumnNames, +table.identifier.quotedString, +sparkSession.sessionState.conf.resolver) + + val partitionSet = { +if (partition._2 != null) { + partition._2.references.foreach { attr => +if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + +s"in table ${table.identifier.quotedString}.") +} + } +val partitions = catalog.listPartitionsByFilter( + table.identifier, Seq(partition._2)).map(_.spec) +if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${partition._2.sql}") +} +partitions +} else { + Seq.empty[TablePartitionSpec] +} + }.distinct + + if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { --- End diff -- can,t we just return `partitionSet ++ normalizedSpecs `? I think it is wrong to use `intersect`, we should drop all of them, shouldn't we? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179940633 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) +case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } +} +if(parts.isEmpty) { --- End diff -- wouldn't be better to return the `Seq[Expression]` as it is? Later we need it like that (in `listPartitionsByFilter `) and in this way we can avoid using `null` which is a good thing too... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179940472 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => --- End diff -- Hive supports them only on the right side. So it makes sense to have the same here I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r179934313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -282,6 +282,27 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { +val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { +case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) +case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => --- End diff -- Still the same question here. Constant has to be in the right side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
GitHub user DazhuangSu opened a pull request: https://github.com/apache/spark/pull/19691 [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support comparators ## What changes were proposed in this pull request? This pr is inspired by @dongjoon-hyun. quote from https://github.com/apache/spark/pull/15704 : > **What changes were proposed in this pull request?** This PR aims to support comparators, e.g. '<', '<=', '>', '>=', again in Apache Spark 2.0 for backward compatibility. **Spark 1.6** `scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [result: string]` `scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") res1: org.apache.spark.sql.DataFrame = [result: string]` **Spark 2.0** `scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = []` `scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")` `org.apache.spark.sql.catalyst.parser.ParseException:` `mismatched input '<' expecting {')', ','}(line 1, pos 42)` After this PR, it's supported. **How was this patch tested?** Pass the Jenkins test with a newly added testcase. https://github.com/apache/spark/pull/16036 points out that if we use int literal in DROP PARTITION will fail after patching https://github.com/apache/spark/pull/15704. The reason of this failing in https://github.com/apache/spark/pull/15704 is that AlterTableDropPartitionCommand tells BinayComparison and EqualTo with following code: `private def isRangeComparison(expr: Expression): Boolean = {⨠` `expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefinedâ¨}` This PR resolve this problem by telling a drop condition when parsing sqls. ## How was this patch tested? New testcase introduced from https://github.com/apache/spark/pull/15704 You can merge this pull request into a Git repository by running: $ git pull https://github.com/DazhuangSu/spark SPARK-17732 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19691.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19691 commit 20f658ad8e14a94dd23bff6a8d795124d1db24e9 Author: Dylan Su Date: 2017-11-08T03:44:28Z [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITION should support comparators --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org