viirya commented on a change in pull request #25979: [SPARK-29295][SQL] Insert 
overwrite to Hive external table partition should delete old data
URL: https://github.com/apache/spark/pull/25979#discussion_r335837451
 
 

 ##########
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ##########
 @@ -209,6 +209,40 @@ case class InsertIntoHiveTable(
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
+        if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+          // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
+          // partition does not exist, Hive will not check if the external 
partition directory
+          // exists or not before copying files. So if users drop the 
partition, and then do
+          // insert overwrite to the same partition, the partition will have 
both old and new
+          // data. We construct partition path. If the path exists, we delete 
it manually.
+          val dpMap = writtenParts.flatMap(_.split("/")).map { part =>
+            val splitPart = part.split("=")
+            assert(splitPart.size == 2, s"Invalid written partition path: 
$part")
+            ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
+              ExternalCatalogUtils.unescapePathName(splitPart(1))
+          }.toMap
+
+          val updatedPartitionSpec = partition.map {
+            case (key, Some(value)) => key -> value
+            case (key, None) if dpMap.contains(key) => key -> dpMap(key)
+            case (key, _) =>
+              throw new SparkException(s"Dynamic partition key $key is not 
among " +
+                "written partition paths.")
+          }
+          val partitionColumnNames = table.partitionColumnNames
+          val tablePath = new Path(table.location)
+          val partitionPath = 
ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec,
+            partitionColumnNames, tablePath)
+
+          val fs = partitionPath.getFileSystem(hadoopConf)
+          if (fs.exists(partitionPath)) {
+            if (!fs.delete(partitionPath, true)) {
 
 Review comment:
   `writtenParts` returned from `saveAsHiveFile`is the set of all partition 
paths that were updated. Here the updated paths are used to fill partition 
spec. The partition directory is constructed based on that.
   
   For example, `INSERT OVERWRITE ... PARTITION(a=1,b)`, if we only update 
`a=1,b=2`, `writtenParts` is Set("a=1/b=2"). `updatedPartitionSpec` is Map('a' 
-> 1, 'b' -> 2). Then the partition directory is constructed based on the spec. 
`a=1,b=1` is not touched.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to