cloud-fan commented on a change in pull request #25979: [SPARK-29295][SQL] 
Insert overwrite to Hive external table partition should delete old data
URL: https://github.com/apache/spark/pull/25979#discussion_r335835188
 
 

 ##########
 File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ##########
 @@ -209,6 +209,40 @@ case class InsertIntoHiveTable(
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
+        if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+          // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
+          // partition does not exist, Hive will not check if the external 
partition directory
+          // exists or not before copying files. So if users drop the 
partition, and then do
+          // insert overwrite to the same partition, the partition will have 
both old and new
+          // data. We construct partition path. If the path exists, we delete 
it manually.
+          val dpMap = writtenParts.flatMap(_.split("/")).map { part =>
+            val splitPart = part.split("=")
+            assert(splitPart.size == 2, s"Invalid written partition path: 
$part")
+            ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
+              ExternalCatalogUtils.unescapePathName(splitPart(1))
+          }.toMap
+
+          val updatedPartitionSpec = partition.map {
+            case (key, Some(value)) => key -> value
+            case (key, None) if dpMap.contains(key) => key -> dpMap(key)
+            case (key, _) =>
+              throw new SparkException(s"Dynamic partition key $key is not 
among " +
+                "written partition paths.")
+          }
+          val partitionColumnNames = table.partitionColumnNames
+          val tablePath = new Path(table.location)
+          val partitionPath = 
ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec,
+            partitionColumnNames, tablePath)
+
+          val fs = partitionPath.getFileSystem(hadoopConf)
+          if (fs.exists(partitionPath)) {
+            if (!fs.delete(partitionPath, true)) {
 
 Review comment:
   hmmm is this corrected? AFAIK hive only overwrite the partition if there is 
new data. e.g. `INSERT OVERWRITE ... PARTITION(a=1,b)`, Hive will leave 
partition `a=1,b=1` unchanged if the input query does not have data for this 
partition.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to