Github user DazhuangSu commented on a diff in the pull request:
https://github.com/apache/spark/pull/19691#discussion_r191795550
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitions: Seq[(TablePartitionSpec, Seq[Expression])],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
- extends RunnableCommand {
+ extends RunnableCommand with PredicateHelper {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER
TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val toDrop = partitions.flatMap { partition =>
+ val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+ partition._1,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver)
+
+ val partitionSet = {
+ if (partition._2.nonEmpty) {
+ val parts = partition._2.map { expr =>
+ val (attrName, value) = expr match {
+ case BinaryComparison(UnresolvedAttribute(name :: Nil),
constant: Literal) =>
+ (name, constant.value)
+ }
+ if (!table.partitionColumnNames.exists(resolver(_, attrName)))
{
+ throw new AnalysisException(s"${attrName} is not a valid
partition column " +
+ s"in table ${table.identifier.quotedString}.")
+ }
+ val dataType = table.partitionSchema.apply(attrName).dataType
+ expr.withNewChildren(Seq(AttributeReference(attrName,
dataType)(),
+ Cast(Literal(value.toString), dataType)))
+ }.reduce(And)
+
+ val partitions = catalog.listPartitionsByFilter(
+ table.identifier, Seq(parts)).map(_.spec)
+ if (partitions.isEmpty && !ifExists) {
--- End diff --
there are two occasions if we get a empty seq here from expression filters
in one `DROP PARTITION` sql.
1. there is at least one filter but there is no partitions for the filter.
2. there is no filters
if we don't add this check, this may be confusing later.
because in the first occasion, we should use the `intersect` with
`normalizedPartitionSpec`
but in the second occasion, we shouldn't use `intersect` because that will
return a empty result.
add this check and we can treat them with different ways
1. regardless of `normalizedPartitionSpec` and throw an exception directly
2. return `Seq.empty`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]