venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415224131



##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
     partitionPaths = mutable.Set[String]()
+    dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
     logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+      val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+      dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+        val fileName = stagingTaskFile.getName
+        val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+        fs.mkdirs(partitionPath)
+        val finalFile = new Path(partitionPath, fileName)
+        if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+          if (fs.exists(finalFile)) {
+            logWarning(
+              s"""
+                | For dynamic partition overwrite operation with speculation 
enabled, failed to

Review comment:
       what happens if yarn preemption is enabled, can the rename of file 
happen but then the executor got preempted? In that case, this warn might not 
make sense. I guess we don't need to say anything specific to speculation.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
     partitionPaths = mutable.Set[String]()
+    dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
     logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+      val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+      dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+        val fileName = stagingTaskFile.getName
+        val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+        fs.mkdirs(partitionPath)

Review comment:
       check if the directory exists already, if not call 
`fs.mkdirs(partitionPath)`

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
     partitionPaths = mutable.Set[String]()
+    dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
     logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+      val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+      dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+        val fileName = stagingTaskFile.getName
+        val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+        fs.mkdirs(partitionPath)
+        val finalFile = new Path(partitionPath, fileName)
+        if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+          if (fs.exists(finalFile)) {
+            logWarning(
+              s"""
+                | For dynamic partition overwrite operation with speculation 
enabled, failed to
+                | rename the staging dynamic file:$stagingTaskFile to 
$finalFile. Some other task
+                | has renamed a staging dynamic file to $finalFile. See 
details in SPARK-29302.
+              """.stripMargin)
+          } else {
+            throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
       if rename failed shouldn't we wrap and propagate that exception? Like 
permission denied etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to