[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r416331191 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| Some other task had renamed a staging dynamic file to $finalFile. +| See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: I have double checked whether this finalFile exists This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r416265024 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| Some other task had renamed a staging dynamic file to $finalFile. +| See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415242493 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +fs.mkdirs(partitionPath) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| For dynamic partition overwrite operation with speculation enabled, failed to +| rename the staging dynamic file:$stagingTaskFile to $finalFile. Some other task +| has renamed a staging dynamic file to $finalFile. See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: I think it is not necessary, I think it would be wrapped as a `SparkException`. ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +fs.mkdirs(partitionPath) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { +logWarning( + s""" +| For dynamic partition overwrite operation with speculation enabled, failed to +| rename the staging dynamic file:$stagingTaskFile to $finalFile. Some other task +| has renamed a staging dynamic file to $finalFile. See details in SPARK-29302. + """.stripMargin) + } else { +throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: I think it is not necessary, it would be wrapped as a `SparkException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It does pass without this fix. Because the local file system allows two task write the same file concurrently. But for distributed file system, it does not allow two task write the same file concurrently. An exception like `no lease on inode` thrown for DFS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It does pass without this fix. Because the local file system allows two task write the same file. But for distributed file system, it does not allow two task write the same file. An exception like `no lease on inode` thrown for DFS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It does pass without this fix. Because the local file system allows two task write the same file. But for distributed file system, it do not allow two task write the same file. An exception like `no lease on inode` thrown for DFS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It does pass without this fix. Because the local file system allows two task write the same file. But for distributed file system, it do not allow two task write the same file. The exception is liked `No lease on inode`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It does pass without this fix. Because for the local file system, it would not throw exception. But for the distributed file system, it do not allow two task write the same file. The exception is liked `No lease on inode`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r410834586 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ## @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + +private class DetectDynamicSpeculationCommitProtocol( +jobId: String, +path: String, +dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { +if (dynamicPartitionOverwrite) { + val partitionPathSet = dynamicStagingTaskFiles +.map(taskFile => getDynamicPartitionPath(taskFile, taskContext)) +.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath + + Path.SEPARATOR)) + assert(partitionPathSet.equals(partitionPaths)) +} +super.commitTask(taskContext) + } +} + +class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + override def sparkConf(): SparkConf = { +super.sparkConf + .set(config.SPECULATION_MULTIPLIER, 0.0) + .set(config.SPECULATION_QUANTILE, 0.5) + .set(config.SPECULATION_ENABLED, true) + } + + test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition overwrite, a " + +"task would conflict with its speculative task") { +withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> +classOf[DetectDynamicSpeculationCommitProtocol].getName) { + withTable("t") { Review comment: It dis passed without this fix. Because for the local file system, it would not throw exception. But for the distributed file system, it do not allow two task write the same file. The exception is liked `No lease on inode`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407858821 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { Review comment: yeah, this log is a little confused. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407857789 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Will try to add relative tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407851801 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: If rename failed, partial result would be loss. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407837974 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Thanks, I just check the code. It seems that the `SparkHadoopMapRedUtil.commitTask` has no meaning for dynamic partition overwrite, because dynamicPartitionOverwrite operation has a specific staging dir (`.spark-staging-jobId`) instead of using the working temporary of FileOutputCommitter(`_temporary/0`). So, the conflict issue is not related with the enable of outputCommitCoordination. > Another question is if it's possible the existing finalFile is corrupted due to another task crashing during commit. It is possible, because there may be several dynamicStagingTaskFiles,if the task aborted when renaming these dynamicStagingTaskFiles to finalFiles, partial outputs are moved to the partition paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407837974 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Thanks, I just check the code. It seems that the `SparkHadoopMapRedUtil.commitTask` has no meaning for dynamic partition overwrite, because dynamicPartitionOverwrite operation has a specific staging dir (`.spark-staging-jobId`) instead of using the working temporary of FileOutputCommitter(`_temporary/0`). So, the conflict issue is not related with the enable of outputCommitCoordination. > Another question is if it's possible the existing finalFile is corrupted due to another task crashing during commit. It is possible, because there may be several dynamicStagingTaskFiles,if the task aborted when commitTask, partial outputs are moved to the partition paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407837974 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Thanks, I just check the code. It seems that the `SparkHadoopMapRedUtil.commitTask` has no meaning for dynamic partition overwrite, because dynamicPartitionOverwrite operation has a specific staging dir (`.spark-staging-jobId`) instead of using the working temporary of FileOutputCommitter(`_temporary/0`). So, the conflict issue is not related with the enable of outputCommitCoordination. > Another question is if it's possible the existing finalFile is corrupted due to another task crashing during commit. It is possible, because there may be several dynamicStagingTaskFiles,if the task aborted when `commitTask`, partial outputs are moved to the partition paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407837974 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Thanks, I just check the code. It seems that the `SparkHadoopMapRedUtil.commitTask` has no meaning for dynamic partition overwrite, because dynamicPartitionOverwrite operation has a specific staging dir (`.spark-staging-jobId`) instead of use the working temporary of FileOutputCommitter(`_temporary/0`). So, the conflict issue is not related with the enable of outputCommitCoordination. > Another question is if it's possible the existing finalFile is corrupted due to another task crashing during commit. It is possible, because there may be several dynamicStagingTaskFiles,if the task aborted when commitTask, partial outputs are moved to the partition paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407837974 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: Thanks, I just check the code. It seems that the `SparkHadoopMapRedUtil.commitTask` has no meaning for dynamic partition overwrite, because it has a specific staging dir (`.spark-staging-jobId`) instead of use the working temporary of FileOutputCommitter(`_temporary/0`). So, the conflict issue is not related with the enable of outputCommitCoordination. > Another question is if it's possible the existing finalFile is corrupted due to another task crashing during commit. It is possible, because there may be several dynamicStagingTaskFiles,if the task aborted when commitTask, partial outputs are moved to the partition paths. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407823308 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -83,13 +83,38 @@ class HadoopMapReduceCommitProtocol( * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to * destination directory at the end, if `dynamicPartitionOverwrite` is true. */ - @transient private var partitionPaths: mutable.Set[String] = null + @transient private[spark] var partitionPaths: mutable.Set[String] = null /** * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + private[spark] def stagingDir = new Path(path, ".spark-staging-" + jobId) + + /** + * Tracks the staging task files with dynamicPartitionOverwrite=true. + */ + @transient private[spark] var dynamicStagingTaskFiles: mutable.Set[Path] = null Review comment: Just test, it should be mutable, otherwise there is no `+=` method for immutable.Set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r407822740 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val finalFile = new Path(partitionPath, fileName) +if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: I think the finalFile may exist when spark.hadoop.outputCommitCoordination.enabled is false. It is true by defaults. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405247218 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val destFile = new Path(taskPartitionPath, fileName) +if (!fs.rename(stagingTaskFile, destFile)) { Review comment: got, I will add a check. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405245208 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession Review comment: Before set SPECULATION_MULTIPLIER to 0, I could not guarantee a concurrent speculative task could be launched. After set `SPECULATION_MULTIPLIER` to 0 and set `SPECULATION_QUANTILE` to 0.5, I can ensure that some speculative tasks would be launched. Referred to the ut of SPARK-22074: https://github.com/apache/spark/blob/a3d83948b81cb5c47f6e084801373443f54747d8/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala#L782-L788 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405245208 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession Review comment: Before set SPECULATION_MULTIPLIER to 0, I could not guarantee a concurrent speculative task could be launched. After set `SPECULATION_MULTIPLIER` to 0 and set `SPECULATION_QUANTILE` to 0.5. I can ensure that some speculative tasks would be launched. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405243756 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession Review comment: I tried to add an ut for HadoopMapReduceCommitProtocol only, but it seems that this class is an internal class and just used for sql module. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241541 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val destFile = new Path(taskPartitionPath, fileName) +if (!fs.rename(stagingTaskFile, destFile)) { Review comment: We can add a check here, and if destFile existed, I think we can remove it directly, for that this operation is after SparkHadoopMapRedUtil.commitTask. If outputCommitCoordination is enabled, only one task can commit its output. Otherwise, it does not matter to remove it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241847 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession Review comment: Do you mean that this is an integration test and I just need to test the HadoopMapReduceCommitProtocol? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241847 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession Review comment: Do you means that this is an integration test and I just need to test the HadoopMapReduceCommitProtocol? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241581 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,29 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { +val attemptID = taskContext.getTaskAttemptID.getId +new Path(stagingDir, s"${dir.get}-${attemptID}") + } + + /** + * Tracks the staging task files with dynamicPartitionOverwrite=true. + */ + @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null Review comment: thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241541 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) +val destFile = new Path(taskPartitionPath, fileName) +if (!fs.rename(stagingTaskFile, destFile)) { Review comment: I think we can add a check here, and if destFile existed, I think we can remove it directly, for that this operation is after SparkHadoopMapRedUtil.commitTask. If outputCommitCoordination is enabled, only one task can commit its output. Otherwise, it does not matter to remove it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r405241600 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,29 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { Review comment: thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r403982165 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite && dir.isDefined) +val attemptID = taskContext.getTaskAttemptID.getId +new Path(stagingDir, s"${dir.get}-${attemptID}") + } + + /** + * Tracks the staging task files with dynamicPartitionOverwrite=true. + */ + @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null + + /** + * Get responding partition path for a task with dynamicPartitionOverwrite=true. + */ + private def getDynamicPartitionPath(stagingTaskFile: Path, context: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite) Review comment: will remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r403982105 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: Option[String], taskContext: TaskAttemptContext): Path = { +assert(dynamicPartitionOverwrite && dir.isDefined) Review comment: will remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r403044978 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,23 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +stagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + stagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val taskPartitionPath = getPartitionPath(stagingTaskFile) +val destFile = new Path(new Path(stagingDir, taskPartitionPath), fileName) +fs.rename(stagingTaskFile, destFile) Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta
turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task URL: https://github.com/apache/spark/pull/26339#discussion_r355823338 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -236,13 +271,23 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() +stagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) +if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + stagingTaskFiles.foreach { stagingTaskFile => +val fileName = stagingTaskFile.getName +val taskPartitionPath = getPartitionPath(stagingTaskFile) +val destFile = new Path(new Path(stagingDir, taskPartitionPath), fileName) +fs.rename(stagingTaskFile, destFile) Review comment: Thanks for your suggestion, but I think we can not do this. For dynamic partition overwrite, each task may write data to many partition paths. If we just rename a folder, it would overwrite other task's output. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org