xkrogen commented on a change in pull request #32530:
URL: https://github.com/apache/spark/pull/32530#discussion_r631885834
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -188,13 +188,18 @@ class HadoopMapReduceCommitProtocol(
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
+ val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
if (dynamicPartitionOverwrite) {
- val absPartitionPaths = filesToMove.values.map(new
Path(_).getParent).toSet
- logDebug(s"Clean up absolute partition directories for overwriting:
$absPartitionPaths")
- absPartitionPaths.foreach(fs.delete(_, true))
+ logDebug(s"Clean up absolute partition directories for overwriting:
$absParentPaths")
+ absParentPaths.foreach(fs.delete(_, true))
}
+ logDebug(s"Create absolute parent directories: $absParentPaths")
+ absParentPaths.foreach(fs.mkdirs)
for ((src, dst) <- filesToMove) {
- fs.rename(new Path(src), new Path(dst))
+ if (!fs.rename(new Path(src), new Path(dst))) {
Review comment:
I believe we do actually need the rename here. My PR was based on a very
limited understanding of what was going on here and made the assumption that,
with HDFS semantics, the code works properly. Later investigations (as in the
comments) showed this wasn't the case and that this rename step is actually
necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]