[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-27 Thread GitBox


turboFei commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r416331191



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| Some other task had renamed a staging dynamic file to 
$finalFile.
+| See details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   I have double checked whether this finalFile exists





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-27 Thread GitBox


turboFei commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r416265024



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| Some other task had renamed a staging dynamic file to 
$finalFile.
+| See details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-26 Thread GitBox


turboFei commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415242493



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+fs.mkdirs(partitionPath)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| For dynamic partition overwrite operation with speculation 
enabled, failed to
+| rename the staging dynamic file:$stagingTaskFile to 
$finalFile. Some other task
+| has renamed a staging dynamic file to $finalFile. See 
details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   I think it is not necessary, I think it would be wrapped as a 
`SparkException`.

##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+fs.mkdirs(partitionPath)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| For dynamic partition overwrite operation with speculation 
enabled, failed to
+| rename the staging dynamic file:$stagingTaskFile to 
$finalFile. Some other task
+| has renamed a staging dynamic file to $finalFile. See 
details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   I think it is not necessary, it would be wrapped as a `SparkException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It does pass without this fix.
   Because the local file system allows two task write the same file 
concurrently.
   But for distributed file system, it does not allow two task write the same 
file concurrently.
   An exception like `no lease on inode` thrown for DFS.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It does pass without this fix.
   Because the local file system allows two task write the same file.
   But for distributed file system, it does not allow two task write the same 
file.
   An exception like `no lease on inode` thrown for DFS.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It does pass without this fix.
   Because the local file system allows two task write the same file.
   But for distributed file system, it do not allow two task write the same 
file.
   An exception like `no lease on inode` thrown for DFS.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It does pass without this fix.
   Because the local file system allows two task write the same file.
   But for distributed file system, it do not allow two task write the same 
file.
   The exception is liked `No lease on inode`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It does pass without this fix.
   Because for the local file system, it would not throw exception.
   But for the distributed file system, it do not allow two task write the same 
file.
   The exception is liked `No lease on inode`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-19 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410834586
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   It dis passed without this fix.
   Because for the local file system, it would not throw exception.
   But for the distributed file system, it do not allow two task write the same 
file.
   The exception is liked `No lease on inode`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407858821
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
 
 Review comment:
   yeah, this log is a little confused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407857789
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Will try to add relative tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407851801
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   If rename failed, partial result would be loss.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407837974
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Thanks, I just check the code.
   It seems that the  `SparkHadoopMapRedUtil.commitTask` has no meaning for 
dynamic partition overwrite, because dynamicPartitionOverwrite operation has a 
specific staging dir (`.spark-staging-jobId`) instead of using the working 
temporary of FileOutputCommitter(`_temporary/0`).
   
   So, the conflict issue is not related with the enable of 
outputCommitCoordination.
   
   
   > Another question is if it's possible the existing finalFile is corrupted 
due to another task crashing during commit.
   
   It is possible, because there may be several dynamicStagingTaskFiles,if the 
task aborted when renaming these dynamicStagingTaskFiles to finalFiles, partial 
outputs are moved to the partition paths.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407837974
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Thanks, I just check the code.
   It seems that the  `SparkHadoopMapRedUtil.commitTask` has no meaning for 
dynamic partition overwrite, because dynamicPartitionOverwrite operation has a 
specific staging dir (`.spark-staging-jobId`) instead of using the working 
temporary of FileOutputCommitter(`_temporary/0`).
   
   So, the conflict issue is not related with the enable of 
outputCommitCoordination.
   
   
   > Another question is if it's possible the existing finalFile is corrupted 
due to another task crashing during commit.
   
   It is possible, because there may be several dynamicStagingTaskFiles,if the 
task aborted when commitTask, partial outputs are moved to the partition paths.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407837974
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Thanks, I just check the code.
   It seems that the  `SparkHadoopMapRedUtil.commitTask` has no meaning for 
dynamic partition overwrite, because dynamicPartitionOverwrite operation has a 
specific staging dir (`.spark-staging-jobId`) instead of using the working 
temporary of FileOutputCommitter(`_temporary/0`).
   
   So, the conflict issue is not related with the enable of 
outputCommitCoordination.
   
   
   > Another question is if it's possible the existing finalFile is corrupted 
due to another task crashing during commit.
   
   It is possible, because there may be several dynamicStagingTaskFiles,if the 
task aborted when `commitTask`, partial outputs are moved to the partition 
paths.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407837974
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Thanks, I just check the code.
   It seems that the  `SparkHadoopMapRedUtil.commitTask` has no meaning for 
dynamic partition overwrite, because dynamicPartitionOverwrite operation has a 
specific staging dir (`.spark-staging-jobId`) instead of use the working 
temporary of FileOutputCommitter(`_temporary/0`).
   
   So, the conflict issue is not related with the enable of 
outputCommitCoordination.
   
   
   > Another question is if it's possible the existing finalFile is corrupted 
due to another task crashing during commit.
   
   It is possible, because there may be several dynamicStagingTaskFiles,if the 
task aborted when commitTask, partial outputs are moved to the partition paths.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407837974
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   Thanks, I just check the code.
   It seems that the  `SparkHadoopMapRedUtil.commitTask` has no meaning for 
dynamic partition overwrite, because it has a specific staging dir 
(`.spark-staging-jobId`) instead of use the working temporary of 
FileOutputCommitter(`_temporary/0`).
   
   So, the conflict issue is not related with the enable of 
outputCommitCoordination.
   
   
   > Another question is if it's possible the existing finalFile is corrupted 
due to another task crashing during commit.
   
   It is possible, because there may be several dynamicStagingTaskFiles,if the 
task aborted when commitTask, partial outputs are moved to the partition paths.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407823308
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -83,13 +83,38 @@ class HadoopMapReduceCommitProtocol(
* e.g. a=1/b=2. Files under these partitions will be saved into staging 
directory and moved to
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
-  @transient private var partitionPaths: mutable.Set[String] = null
+  @transient private[spark] var partitionPaths: mutable.Set[String] = null
 
   /**
* The staging directory of this write job. Spark uses it to deal with files 
with absolute output
* path, or writing data into partitioned directory with 
dynamicPartitionOverwrite=true.
*/
-  private def stagingDir = new Path(path, ".spark-staging-" + jobId)
+  private[spark] def stagingDir = new Path(path, ".spark-staging-" + jobId)
+
+  /**
+   * Tracks the staging task files with dynamicPartitionOverwrite=true.
+   */
+  @transient private[spark] var dynamicStagingTaskFiles: mutable.Set[Path] = 
null
 
 Review comment:
   Just test, it should be mutable, otherwise there is no `+=` method for 
immutable.Set.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-13 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r407822740
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
 
 Review comment:
   I think the finalFile may exist when 
spark.hadoop.outputCommitCoordination.enabled is false.
   It is true by defaults.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405247218
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val destFile = new Path(taskPartitionPath, fileName)
+if (!fs.rename(stagingTaskFile, destFile)) {
 
 Review comment:
   got, I will add a check. thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405245208
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession
 
 Review comment:
   Before set SPECULATION_MULTIPLIER to 0, I could not guarantee a concurrent 
speculative task could be launched.
   
   After set `SPECULATION_MULTIPLIER` to 0 and set `SPECULATION_QUANTILE` to 
0.5, I can ensure that some speculative tasks would be launched.
   Referred to the ut of SPARK-22074:
   
https://github.com/apache/spark/blob/a3d83948b81cb5c47f6e084801373443f54747d8/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala#L782-L788
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405245208
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession
 
 Review comment:
   Before set SPECULATION_MULTIPLIER to 0, I could not guarantee a concurrent 
speculative task could be launched.
   
   After set `SPECULATION_MULTIPLIER` to 0 and set `SPECULATION_QUANTILE` to 
0.5.
   I can ensure that some speculative tasks would be launched.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405243756
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession
 
 Review comment:
   I tried to add an ut for HadoopMapReduceCommitProtocol only, but it seems 
that this class is an internal class and just used for sql module.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241541
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val destFile = new Path(taskPartitionPath, fileName)
+if (!fs.rename(stagingTaskFile, destFile)) {
 
 Review comment:
   We can add a check here, and if destFile existed, I think we can remove it 
directly, for that this operation is after SparkHadoopMapRedUtil.commitTask.
   If outputCommitCoordination is enabled, only one task can commit its output.
   Otherwise, it does not matter to remove it directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241847
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession
 
 Review comment:
   Do you mean that this is an integration test and I just need to test the 
HadoopMapReduceCommitProtocol?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241847
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuiteWithSpeculation.scala
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SQLQuerySuiteWithSpeculation extends QueryTest with SharedSparkSession
 
 Review comment:
   Do you means that this is an integration test and I just need to test the 
HadoopMapReduceCommitProtocol?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241581
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,29 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
+val attemptID = taskContext.getTaskAttemptID.getId
+new Path(stagingDir, s"${dir.get}-${attemptID}")
+  }
+
+  /**
+   * Tracks the staging task files with dynamicPartitionOverwrite=true.
+   */
+  @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null
 
 Review comment:
   thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241541
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val destFile = new Path(taskPartitionPath, fileName)
+if (!fs.rename(stagingTaskFile, destFile)) {
 
 Review comment:
   I think we can add a check here, and if destFile existed, I think we can 
remove it directly, for that this operation is after 
SparkHadoopMapRedUtil.commitTask.
   If outputCommitCoordination is enabled, only one task can commit its output.
   Otherwise, it does not matter to remove it directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-07 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241600
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,29 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
 
 Review comment:
   thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-06 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r403982165
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite && dir.isDefined)
+val attemptID = taskContext.getTaskAttemptID.getId
+new Path(stagingDir, s"${dir.get}-${attemptID}")
+  }
+
+  /**
+   * Tracks the staging task files with dynamicPartitionOverwrite=true.
+   */
+  @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null
+
+  /**
+   * Get responding partition path for a task with 
dynamicPartitionOverwrite=true.
+   */
+  private def getDynamicPartitionPath(stagingTaskFile: Path, context: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite)
 
 Review comment:
   will remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-06 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r403982105
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite && dir.isDefined)
 
 Review comment:
   will remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2020-04-03 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r403044978
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +271,23 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+stagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  stagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val taskPartitionPath = getPartitionPath(stagingTaskFile)
+val destFile = new Path(new Path(stagingDir, taskPartitionPath), 
fileName)
+fs.rename(stagingTaskFile, destFile)
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative ta

2019-12-09 Thread GitBox
turboFei commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r355823338
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +271,23 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+stagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  stagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val taskPartitionPath = getPartitionPath(stagingTaskFile)
+val destFile = new Path(new Path(stagingDir, taskPartitionPath), 
fileName)
+fs.rename(stagingTaskFile, destFile)
 
 Review comment:
   Thanks for your suggestion, but I think we can not do this.
   For dynamic partition overwrite, each task may write data to many partition 
paths.
   If we just rename a folder, it would overwrite other task's output.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org