Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19691#discussion_r179940757
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -515,28 +515,58 @@ case class AlterTableRenamePartitionCommand(
      */
     case class AlterTableDropPartitionCommand(
         tableName: TableIdentifier,
    -    specs: Seq[TablePartitionSpec],
    +    partitions: Seq[(TablePartitionSpec, Expression)],
         ifExists: Boolean,
         purge: Boolean,
         retainData: Boolean)
    -  extends RunnableCommand {
    +  extends RunnableCommand with PredicateHelper {
     
       override def run(sparkSession: SparkSession): Seq[Row] = {
         val catalog = sparkSession.sessionState.catalog
         val table = catalog.getTableMetadata(tableName)
    +    val resolver = sparkSession.sessionState.conf.resolver
         DDLUtils.verifyAlterTableType(catalog, table, isView = false)
         DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
     
    -    val normalizedSpecs = specs.map { spec =>
    -      PartitioningUtils.normalizePartitionSpec(
    -        spec,
    -        table.partitionColumnNames,
    -        table.identifier.quotedString,
    -        sparkSession.sessionState.conf.resolver)
    +    val toDrop = partitions.flatMap { partition =>
    +      val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
    +            partition._1,
    +            table.partitionColumnNames,
    +            table.identifier.quotedString,
    +            sparkSession.sessionState.conf.resolver)
    +
    +      val partitionSet = {
    +        if (partition._2 != null) {
    +          partition._2.references.foreach { attr =>
    +            if (!table.partitionColumnNames.exists(resolver(_, 
attr.name))) {
    +              throw new AnalysisException(s"${attr.name} is not a valid 
partition column " +
    +                s"in table ${table.identifier.quotedString}.")
    +            }
    +          }
    +            val partitions = catalog.listPartitionsByFilter(
    +              table.identifier, Seq(partition._2)).map(_.spec)
    +            if (partitions.isEmpty && !ifExists) {
    +              throw new AnalysisException(s"There is no partition for 
${partition._2.sql}")
    +            }
    +            partitions
    +        } else {
    +          Seq.empty[TablePartitionSpec]
    +        }
    +      }.distinct
    +
    +      if (normalizedSpecs.isEmpty && partitionSet.isEmpty) {
    --- End diff --
    
    can,t we just return `partitionSet  ++ normalizedSpecs `? I think it is 
wrong to use `intersect`, we should drop all of them, shouldn't we?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to