Github user DazhuangSu commented on a diff in the pull request:
https://github.com/apache/spark/pull/19691#discussion_r192147689
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitions: Seq[(TablePartitionSpec, Seq[Expression])],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
- extends RunnableCommand {
+ extends RunnableCommand with PredicateHelper {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER
TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val toDrop = partitions.flatMap { partition =>
+ val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+ partition._1,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver)
+
+ val partitionSet = {
+ if (partition._2.nonEmpty) {
+ val parts = partition._2.map { expr =>
+ val (attrName, value) = expr match {
+ case BinaryComparison(UnresolvedAttribute(name :: Nil),
constant: Literal) =>
+ (name, constant.value)
+ }
+ if (!table.partitionColumnNames.exists(resolver(_, attrName)))
{
+ throw new AnalysisException(s"${attrName} is not a valid
partition column " +
+ s"in table ${table.identifier.quotedString}.")
+ }
+ val dataType = table.partitionSchema.apply(attrName).dataType
+ expr.withNewChildren(Seq(AttributeReference(attrName,
dataType)(),
+ Cast(Literal(value.toString), dataType)))
+ }.reduce(And)
+
+ val partitions = catalog.listPartitionsByFilter(
+ table.identifier, Seq(parts)).map(_.spec)
+ if (partitions.isEmpty && !ifExists) {
--- End diff --
let me explain these two occasions more clearly. two sqls for example(the
useless_expression means there are no partitions for the expression):
`ALTER TABLE DROP PARTITION(partitionVal1, useless_expression)`
`ALTER TABLE DROP PARTITION(partitionVal1)`
the first sql should drop partition `partitionVal1` <b>intersect</b>
`useless_expression`, and it's empty.
the second sql should drop partition `partitionVal1`
if we return `Seq.empty` to `partitonSet` for both sqls, it will be
impossible to tell between them later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]