Github user DazhuangSu commented on a diff in the pull request:
https://github.com/apache/spark/pull/19691#discussion_r192130248
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -515,28 +515,66 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitions: Seq[(TablePartitionSpec, Seq[Expression])],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
- extends RunnableCommand {
+ extends RunnableCommand with PredicateHelper {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER
TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val toDrop = partitions.flatMap { partition =>
+ val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
+ partition._1,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver)
+
+ val partitionSet = {
+ if (partition._2.nonEmpty) {
+ val parts = partition._2.map { expr =>
+ val (attrName, value) = expr match {
+ case BinaryComparison(UnresolvedAttribute(name :: Nil),
constant: Literal) =>
+ (name, constant.value)
+ }
+ if (!table.partitionColumnNames.exists(resolver(_, attrName)))
{
+ throw new AnalysisException(s"${attrName} is not a valid
partition column " +
+ s"in table ${table.identifier.quotedString}.")
+ }
+ val dataType = table.partitionSchema.apply(attrName).dataType
+ expr.withNewChildren(Seq(AttributeReference(attrName,
dataType)(),
+ Cast(Literal(value.toString), dataType)))
--- End diff --
Ok I get your point.
I just run a quick test. it threw a exception "java.lang.RuntimeException:
Unsupported literal type class org.apache.spark.unsafe.types.UTF8String" at
this line when I run:
`ALTER TABLE table_a PARTITION(a < 'test')`
so there is one line change in `literals.scala` needed.
the method `def apply(v: Any): Literal` (literals.scala: line 52) only
support `String` not `UTF8String` for now.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]