Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20999#discussion_r223626350
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitionsFilters: Seq[Seq[Expression]],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
+ val timeZone =
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
val table = catalog.getTableMetadata(tableName)
+ val partitionColumns = table.partitionColumnNames
+ val partitionAttributes = table.partitionSchema.toAttributes.map(a =>
a.name -> a).toMap
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER
TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+ if (hasComplexFilters(filtersSpec)) {
+ generatePartitionSpec(filtersSpec,
+ partitionColumns,
+ partitionAttributes,
+ table.identifier,
+ catalog,
+ sparkSession.sessionState.conf.resolver,
+ timeZone,
+ ifExists)
+ } else {
+ val partitionSpec = filtersSpec.map {
+ case EqualTo(key: Attribute, Literal(value, StringType)) =>
+ key.name -> value.toString
+ }.toMap
+ PartitioningUtils.normalizePartitionSpec(
+ partitionSpec,
+ partitionColumns,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver) :: Nil
+ }
}
catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists,
purge = purge,
+ table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge
= purge,
retainData = retainData)
CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
+ def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+ partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+ }
+
+ def generatePartitionSpec(
+ partitionFilterSpec: Seq[Expression],
+ partitionColumns: Seq[String],
+ partitionAttributes: Map[String, Attribute],
+ tableIdentifier: TableIdentifier,
+ catalog: SessionCatalog,
+ resolver: Resolver,
+ timeZone: Option[String],
+ ifExists: Boolean): Seq[TablePartitionSpec] = {
+ val filters = partitionFilterSpec.map { pFilter =>
+ pFilter.transform {
+ // Resolve the partition attributes
+ case partitionCol: Attribute =>
+ val normalizedPartition =
PartitioningUtils.normalizePartitionColumn(
+ partitionCol.name,
+ partitionColumns,
+ tableIdentifier.quotedString,
+ resolver)
+ partitionAttributes(normalizedPartition)
+ }.transform {
+ // Cast the partition value to the data type of the corresponding
partition attribute
+ case cmp @ BinaryComparison(partitionAttr, value)
+ if !partitionAttr.dataType.sameType(value.dataType) =>
+ cmp.withNewChildren(Seq(partitionAttr, Cast(value,
partitionAttr.dataType, timeZone)))
--- End diff --
Yes, please see the tests here:
https://github.com/apache/spark/pull/20999/files/a964d2a7def5aed04bd362b3000b36583c0ba272#diff-b7094baa12601424a5d19cb930e3402fR663.
Notice that value is always a `string` so in all cases with different
datatypes we are using the cast.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]