viirya commented on a change in pull request #25979: [SPARK-29295][SQL] Insert
overwrite to Hive external table partition should delete old data
URL: https://github.com/apache/spark/pull/25979#discussion_r335837451
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
##########
@@ -209,6 +209,40 @@ case class InsertIntoHiveTable(
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
+ if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+ // SPARK-29295: When insert overwrite to a Hive external table
partition, if the
+ // partition does not exist, Hive will not check if the external
partition directory
+ // exists or not before copying files. So if users drop the
partition, and then do
+ // insert overwrite to the same partition, the partition will have
both old and new
+ // data. We construct partition path. If the path exists, we delete
it manually.
+ val dpMap = writtenParts.flatMap(_.split("/")).map { part =>
+ val splitPart = part.split("=")
+ assert(splitPart.size == 2, s"Invalid written partition path:
$part")
+ ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
+ ExternalCatalogUtils.unescapePathName(splitPart(1))
+ }.toMap
+
+ val updatedPartitionSpec = partition.map {
+ case (key, Some(value)) => key -> value
+ case (key, None) if dpMap.contains(key) => key -> dpMap(key)
+ case (key, _) =>
+ throw new SparkException(s"Dynamic partition key $key is not
among " +
+ "written partition paths.")
+ }
+ val partitionColumnNames = table.partitionColumnNames
+ val tablePath = new Path(table.location)
+ val partitionPath =
ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec,
+ partitionColumnNames, tablePath)
+
+ val fs = partitionPath.getFileSystem(hadoopConf)
+ if (fs.exists(partitionPath)) {
+ if (!fs.delete(partitionPath, true)) {
Review comment:
`writtenParts` returned from `saveAsHiveFile`is the set of all partition
paths that were updated. Here the updated paths are used to fill partition
spec. The partition directory is constructed based on that.
For example, `INSERT OVERWRITE ... PARTITION(a=1,b)`, if we only update
`a=1,b=2`, `writtenParts` is Set("a=1/b=2"). `updatedPartitionSpec` is Map('a'
-> 1, 'b' -> 2). Then the partition directory is constructed based on the spec.
`a=1,b=1` is not touched.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]