venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415368695



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
##########
@@ -157,3 +164,142 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
     }
   }
 }
+
+private class DetectDynamicStagingTaskPartitionPathCommitProtocol(
+    jobId: String,
+    path: String,
+    dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+    if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+      val partitionPathSet = dynamicStagingTaskFiles
+        .map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+        .map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+          Path.SEPARATOR))
+      assert(partitionPathSet.equals(partitionPaths))
+    }
+    super.commitTask(taskContext)
+  }
+}
+
+/**
+ * This file system is used to simulate the scene that for dynamic partition 
overwrite operation
+ * with speculation enabled, rename from dynamic staging files to final files 
failed for the task,
+ * whose taskId and attemptId are 0, because another task has renamed a 
dynamic staging file to the
+ * final file.
+ */
+class AnotherTaskRenamedForFirstTaskFirstAttemptFileSystem extends 
RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+    if (src.getName.startsWith("part-")) {
+      Try {
+        // File name format is part-$split%05d-$jobId$ext
+        val taskId = src.getName.split("-").apply(1).toInt
+        val attemptId = src.getParent.getName.split("-").last.toInt
+        taskId == 0 && attemptId == 0
+      } match {
+        case Success(shouldRenameFailed) if shouldRenameFailed =>
+          super.rename(src, dst)
+          false
+        case _ => super.rename(src, dst)
+      }
+    } else {
+      super.rename(src, dst)
+    }
+  }
+}
+
+/**
+ * This file system is used to simulate the scene that for dynamic partition 
overwrite operation
+ * with speculation enabled, rename from dynamic staging files to final files 
failed for the task,
+ * whose taskId and attemptId are 0, and there is no another task has renamed 
a dynamic staging
+ * file to the final file.
+ */
+class RenameFailedForFirstTaskFirstAttemptFileSystem extends 
RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+    if (src.getName.startsWith("part-")) {

Review comment:
       can we also mock the behavior of DistributedFileSystem for rename here 
as in if the dir not exists, fail with an exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to