Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19691#discussion_r192367934
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
      */
     case class AlterTableDropPartitionCommand(
         tableName: TableIdentifier,
    -    specs: Seq[TablePartitionSpec],
    +    partitions: Seq[(TablePartitionSpec, Seq[Expression])],
         ifExists: Boolean,
         purge: Boolean,
         retainData: Boolean)
    -  extends RunnableCommand {
    +  extends RunnableCommand with PredicateHelper {
     
       override def run(sparkSession: SparkSession): Seq[Row] = {
         val catalog = sparkSession.sessionState.catalog
         val table = catalog.getTableMetadata(tableName)
    +    val resolver = sparkSession.sessionState.conf.resolver
         DDLUtils.verifyAlterTableType(catalog, table, isView = false)
         DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
     
    -    val normalizedSpecs = specs.map { spec =>
    -      PartitioningUtils.normalizePartitionSpec(
    -        spec,
    -        table.partitionColumnNames,
    -        table.identifier.quotedString,
    -        sparkSession.sessionState.conf.resolver)
    +    val toDrop = partitions.flatMap { partition =>
    +      val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
    +            partition._1,
    +            table.partitionColumnNames,
    +            table.identifier.quotedString,
    +            sparkSession.sessionState.conf.resolver)
    +
    +      val partitionSet = {
    +        if (partition._2.nonEmpty) {
    +          val parts = partition._2.map { expr =>
    +            val (attrName, value) = expr match {
    +              case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
    +                (name, constant.value)
    +            }
    +            if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
    +              throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
    +                s"in table ${table.identifier.quotedString}.")
    +            }
    +            val dataType = table.partitionSchema.apply(attrName).dataType
    +            expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
    +              Cast(Literal(value.toString), dataType)))
    +          }.reduce(And)
    +
    +          val partitions = catalog.listPartitionsByFilter(
    +            table.identifier, Seq(parts)).map(_.spec)
    +          if (partitions.isEmpty && !ifExists) {
    --- End diff --
    
    yes, but in the first case `toDrop` would be empty, in the second case it 
would contain `partitionVal1`. So when it is passed later to `dropPartitions`, 
this method checks if it is empty or not.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to