[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r416259866 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| Some other task had renamed a staging dynamic file to $finalFile. +| See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: @turboFei It seems even with outputCoordinatorEnabled, there are cases with speculation where the other task also tries to commit and fails with IOException because the finalFile already exists. I think this needs to be fixed. committer.needsTaskCommit is turning out to be false in those cases I think. I don't think throwing IOException is fine here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r416026281 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -118,7 +147,13 @@ class HadoopMapReduceCommitProtocol( } dir.map { d => - new Path(new Path(stagingDir, d), filename).toString + if (dynamicPartitionOverwrite && isSpeculationEnabled) { Review comment: I think this issue will happen even when speculation is not enabled. When the task fails ( lets say due to Executor OOM or various other reasons), leaves partial data and when the new attempt executes, it fails with FileAlreadyExistException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r416026281 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -118,7 +147,13 @@ class HadoopMapReduceCommitProtocol( } dir.map { d => - new Path(new Path(stagingDir, d), filename).toString + if (dynamicPartitionOverwrite && isSpeculationEnabled) { Review comment: I think this issue will happen even when speculation is not enabled. When the task fails and writes partial data and when the new attempt executes, it fails with FileAlreadyExistException ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,36 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +if (!fs.exists(partitionPath)) { Review comment: Also rather than calling !fs.exists here and creating dir, better to create it as part of getDynamicPartitionPath itself if it doesn't exists. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415368695 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +164,142 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicStagingTaskPartitionPathCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +/** + * This file system is used to simulate the scene that for dynamic partition overwrite operation + * with speculation enabled, rename from dynamic staging files to final files failed for the task, + * whose taskId and attemptId are 0, because another task has renamed a dynamic staging file to the + * final file. + */ +class AnotherTaskRenamedForFirstTaskFirstAttemptFileSystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { +if (src.getName.startsWith("part-")) { + Try { +// File name format is part-$split%05d-$jobId$ext +val taskId = src.getName.split("-").apply(1).toInt +val attemptId = src.getParent.getName.split("-").last.toInt +taskId == 0 && attemptId == 0 + } match { +case Success(shouldRenameFailed) if shouldRenameFailed => + super.rename(src, dst) + false +case _ => super.rename(src, dst) + } +} else { + super.rename(src, dst) +} + } +} + +/** + * This file system is used to simulate the scene that for dynamic partition overwrite operation + * with speculation enabled, rename from dynamic staging files to final files failed for the task, + * whose taskId and attemptId are 0, and there is no another task has renamed a dynamic staging + * file to the final file. + */ +class RenameFailedForFirstTaskFirstAttemptFileSystem extends RawLocalFileSystem { + override def rename(src: Path, dst: Path): Boolean = { +if (src.getName.startsWith("part-")) { Review comment: can we also mock the behavior of DistributedFileSystem for rename here as in if the dir not exists, fail with an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415224131 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +fs.mkdirs(partitionPath) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| For dynamic partition overwrite operation with speculation enabled, failed to Review comment: what happens if yarn preemption is enabled, can the rename of file happen but then the executor got preempted? In that case, this warn might not make sense. I guess we don't need to say anything specific to speculation. ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +fs.mkdirs(partitionPath) Review comment: check if the directory exists already, if not call `fs.mkdirs(partitionPath)` ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +fs.mkdirs(partitionPath) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| For dynamic partition overwrite operation with speculation enabled, failed to +| rename the staging dynamic file:$stagingTaskFile to $finalFile. Some other task +| has renamed a staging dynamic file to $finalFile. See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: if rename failed shouldn't we wrap and propagate that exception? Like permission denied etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415169284 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: This will fail wrt HDFS, without having the parent directory fs.rename won't work. So the partitionPath directory has to be created before renaming it to final location. Please refer SPARK-23815 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: In case of speculated attempts, its bound to come here as some other task might have renamed to the finalFile, instead of throwing an exception, may be we should log a warning with better message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410825389 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: I was testing it locally, without the fix this test still passes. Can you please check once? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405023852 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { Review comment: so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and how does dynamicPartitionOverwrite deal with that output? Can you please give an example? It seems there are 2 renames happening one due to SparkHadoopMapRedUtil.commitTask and then with the code block inside dynamicPartitionOverwrite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r404983142 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( Review comment: so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and how does dynamicPartitionOverwrite deal with that output? Can you please give an example? It seems there are 2 renames happening one due to SparkHadoopMapRedUtil.commitTask and then with the code block inside dynamicPartitionOverwrite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r404983142 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( Review comment: so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and how does dynamicPartitionOverwrite deal with that output? Can you please give an example? It seems there are 2 renames happening one due to SparkHadoopMapRedUtil.commitTask and then with the code block inside dynamicPartitionOverwrite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r404983142 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( Review comment: so where does SparkHadoopMapRedUtil.commitTask moves the data from and to and how does dynamicPartitionOverwrite deal with that output? It seems there are 2 renames happening one due to SparkHadoopMapRedUtil.commitTask and then with the code block inside dynamicPartitionOverwrite This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r403841433 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite && dir.isDefined) Review comment: already these are private defs do we still need assert? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t
venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r403841559 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite && dir.isDefined) +val attemptID = taskContext.getTaskAttemptID.getId +new Path(stagingDir, s"${dir.get}-${attemptID}") + } + + /** + * Tracks the staging task files with dynamicPartitionOverwrite=true. + */ + @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null + + /** + * Get responding partition path for a task with dynamicPartitionOverwrite=true. + */ + private def getDynamicPartitionPath(stagingTaskFile: Path, context: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite) Review comment: Same here, do we need assert? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org