Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/19691#discussion_r193000314
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
*
* The syntax of this command is:
* {{{
- * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2,
...] [PURGE];
+ * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
* }}}
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitions: Seq[(TablePartitionSpec, Seq[Expression])],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
- extends RunnableCommand {
+ extends RunnableCommand with PredicateHelper {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER
TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val toDrop = partitions.flatMap { partition =>
+ if (partition._1.isEmpty && !partition._2.isEmpty) {
+ // There are only expressions in this drop condition.
+ extractFromPartitionFilter(partition._2, catalog, table, resolver)
+ } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+ // There are only partitionSpecs in this drop condition.
+ extractFromPartitionSpec(partition._1, table, resolver)
+ } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+ // This drop condition has both partitionSpecs and expressions.
+ extractFromPartitionFilter(partition._2, catalog, table,
resolver).intersect(
+ extractFromPartitionSpec(partition._1, table, resolver))
+ } else {
+ Seq.empty[TablePartitionSpec]
+ }
}
catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists,
purge = purge,
+ table.identifier, toDrop, ignoreIfNotExists = ifExists, purge =
purge,
retainData = retainData)
CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
+ private def extractFromPartitionSpec(
+ specs: TablePartitionSpec,
+ table: CatalogTable,
+ resolver: Resolver): Seq[Map[String, String]] = {
+ Seq(PartitioningUtils.normalizePartitionSpec(
+ specs,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ resolver))
+ }
+
+ private def extractFromPartitionFilter(
+ filters: Seq[Expression],
+ catalog: SessionCatalog,
+ table: CatalogTable,
+ resolver: Resolver): Seq[TablePartitionSpec] = {
+ val expressions = filters.map { expr =>
+ val (attrName, constant) = expr match {
+ case BinaryComparison(UnresolvedAttribute(name :: Nil), constant:
Literal) =>
+ (name, constant)
+ }
+ if (!table.partitionColumnNames.exists(resolver(_, attrName))) {
+ throw new AnalysisException(s"${attrName} is not a valid partition
column " +
+ s"in table ${table.identifier.quotedString}.")
+ }
+ val dataType = table.partitionSchema.apply(attrName).dataType
+ expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(),
+ Cast(constant, dataType)))
--- End diff --
nit: can we add the cast only when needed, ie. `dataType !=
constant.dataType`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]