Github user DazhuangSu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19691#discussion_r191795550
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
      */
     case class AlterTableDropPartitionCommand(
         tableName: TableIdentifier,
    -    specs: Seq[TablePartitionSpec],
    +    partitions: Seq[(TablePartitionSpec, Seq[Expression])],
         ifExists: Boolean,
         purge: Boolean,
         retainData: Boolean)
    -  extends RunnableCommand {
    +  extends RunnableCommand with PredicateHelper {
     
       override def run(sparkSession: SparkSession): Seq[Row] = {
         val catalog = sparkSession.sessionState.catalog
         val table = catalog.getTableMetadata(tableName)
    +    val resolver = sparkSession.sessionState.conf.resolver
         DDLUtils.verifyAlterTableType(catalog, table, isView = false)
         DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
     
    -    val normalizedSpecs = specs.map { spec =>
    -      PartitioningUtils.normalizePartitionSpec(
    -        spec,
    -        table.partitionColumnNames,
    -        table.identifier.quotedString,
    -        sparkSession.sessionState.conf.resolver)
    +    val toDrop = partitions.flatMap { partition =>
    +      val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
    +            partition._1,
    +            table.partitionColumnNames,
    +            table.identifier.quotedString,
    +            sparkSession.sessionState.conf.resolver)
    +
    +      val partitionSet = {
    +        if (partition._2.nonEmpty) {
    +          val parts = partition._2.map { expr =>
    +            val (attrName, value) = expr match {
    +              case BinaryComparison(UnresolvedAttribute(name :: Nil), 
constant: Literal) =>
    +                (name, constant.value)
    +            }
    +            if (!table.partitionColumnNames.exists(resolver(_, attrName))) 
{
    +              throw new AnalysisException(s"${attrName} is not a valid 
partition column " +
    +                s"in table ${table.identifier.quotedString}.")
    +            }
    +            val dataType = table.partitionSchema.apply(attrName).dataType
    +            expr.withNewChildren(Seq(AttributeReference(attrName, 
dataType)(),
    +              Cast(Literal(value.toString), dataType)))
    +          }.reduce(And)
    +
    +          val partitions = catalog.listPartitionsByFilter(
    +            table.identifier, Seq(parts)).map(_.spec)
    +          if (partitions.isEmpty && !ifExists) {
    --- End diff --
    
    there are two occasions if we get a empty seq here from expression filters 
in one `DROP PARTITION` sql.
    1. there is at least one filter but there is no partitions for the filter.
    2. there is no filters
    
    if we don't add this check, this may be confusing later.
    because in the first occasion, we should use the `intersect` with 
`normalizedPartitionSpec`
    but in the second occasion, we shouldn't use `intersect` because that will 
return a empty result.
    
    add this check and we can treat them with different ways
    1. regardless of `normalizedPartitionSpec` and throw an exception directly
    2. return `Seq.empty`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to