venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415368695
########## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ########## @@ -157,3 +164,142 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicStagingTaskPartitionPathCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val partitionPathSet = dynamicStagingTaskFiles + .map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) + .map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) + } + super.commitTask(taskContext) + } +} + +/** + * This file system is used to simulate the scene that for dynamic partition overwrite operation + * with speculation enabled, rename from dynamic staging files to final files failed for the task, + * whose taskId and attemptId are 0, because another task has renamed a dynamic staging file to the + * final file. + */ +class AnotherTaskRenamedForFirstTaskFirstAttemptFileSystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { + if (src.getName.startsWith("part-")) { + Try { + // File name format is part-$split%05d-$jobId$ext + val taskId = src.getName.split("-").apply(1).toInt + val attemptId = src.getParent.getName.split("-").last.toInt + taskId == 0 && attemptId == 0 + } match { + case Success(shouldRenameFailed) if shouldRenameFailed => + super.rename(src, dst) + false + case _ => super.rename(src, dst) + } + } else { + super.rename(src, dst) + } + } +} + +/** + * This file system is used to simulate the scene that for dynamic partition overwrite operation + * with speculation enabled, rename from dynamic staging files to final files failed for the task, + * whose taskId and attemptId are 0, and there is no another task has renamed a dynamic staging + * file to the final file. + */ +class RenameFailedForFirstTaskFirstAttemptFileSystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { + if (src.getName.startsWith("part-")) { Review comment: can we also mock the behavior of DistributedFileSystem for rename here as in if the dir not exists, fail with an exception. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org