[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-27 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r416259866



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| Some other task had renamed a staging dynamic file to 
$finalFile.
+| See details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   @turboFei It seems even with outputCoordinatorEnabled, there are cases 
with speculation where the other task also tries to commit and fails with 
IOException because the finalFile already exists. I think this needs to be 
fixed. committer.needsTaskCommit is turning out to be false in those cases I 
think. I don't think throwing IOException is fine here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-27 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r416026281



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -118,7 +147,13 @@ class HadoopMapReduceCommitProtocol(
 }
 
 dir.map { d =>
-  new Path(new Path(stagingDir, d), filename).toString
+  if (dynamicPartitionOverwrite && isSpeculationEnabled) {

Review comment:
   I think this issue will happen even when speculation is not enabled. 
When the task fails ( lets say due to Executor OOM or various other reasons), 
leaves partial data and when the new attempt executes, it fails with 
FileAlreadyExistException





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-27 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r416026281



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -118,7 +147,13 @@ class HadoopMapReduceCommitProtocol(
 }
 
 dir.map { d =>
-  new Path(new Path(stagingDir, d), filename).toString
+  if (dynamicPartitionOverwrite && isSpeculationEnabled) {

Review comment:
   I think this issue will happen even when speculation is not enabled. 
When the task fails and writes partial data and when the new attempt executes, 
it fails with FileAlreadyExistException

##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,36 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+if (!fs.exists(partitionPath)) {

Review comment:
   Also rather than calling !fs.exists here and creating dir, better to 
create it as part of getDynamicPartitionPath itself if it doesn't exists.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-26 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415368695



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
##
@@ -157,3 +164,142 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicStagingTaskPartitionPathCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+/**
+ * This file system is used to simulate the scene that for dynamic partition 
overwrite operation
+ * with speculation enabled, rename from dynamic staging files to final files 
failed for the task,
+ * whose taskId and attemptId are 0, because another task has renamed a 
dynamic staging file to the
+ * final file.
+ */
+class AnotherTaskRenamedForFirstTaskFirstAttemptFileSystem extends 
RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+if (src.getName.startsWith("part-")) {
+  Try {
+// File name format is part-$split%05d-$jobId$ext
+val taskId = src.getName.split("-").apply(1).toInt
+val attemptId = src.getParent.getName.split("-").last.toInt
+taskId == 0 && attemptId == 0
+  } match {
+case Success(shouldRenameFailed) if shouldRenameFailed =>
+  super.rename(src, dst)
+  false
+case _ => super.rename(src, dst)
+  }
+} else {
+  super.rename(src, dst)
+}
+  }
+}
+
+/**
+ * This file system is used to simulate the scene that for dynamic partition 
overwrite operation
+ * with speculation enabled, rename from dynamic staging files to final files 
failed for the task,
+ * whose taskId and attemptId are 0, and there is no another task has renamed 
a dynamic staging
+ * file to the final file.
+ */
+class RenameFailedForFirstTaskFirstAttemptFileSystem extends 
RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+if (src.getName.startsWith("part-")) {

Review comment:
   can we also mock the behavior of DistributedFileSystem for rename here 
as in if the dir not exists, fail with an exception.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-25 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415224131



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+fs.mkdirs(partitionPath)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| For dynamic partition overwrite operation with speculation 
enabled, failed to

Review comment:
   what happens if yarn preemption is enabled, can the rename of file 
happen but then the executor got preempted? In that case, this warn might not 
make sense. I guess we don't need to say anything specific to speculation.

##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+fs.mkdirs(partitionPath)

Review comment:
   check if the directory exists already, if not call 
`fs.mkdirs(partitionPath)`

##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite && isSpeculationEnabled) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+fs.mkdirs(partitionPath)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+  if (fs.exists(finalFile)) {
+logWarning(
+  s"""
+| For dynamic partition overwrite operation with speculation 
enabled, failed to
+| rename the staging dynamic file:$stagingTaskFile to 
$finalFile. Some other task
+| has renamed a staging dynamic file to $finalFile. See 
details in SPARK-29302.
+  """.stripMargin)
+  } else {
+throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   if rename failed shouldn't we wrap and propagate that exception? Like 
permission denied etc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-25 Thread GitBox


venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415169284



##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {

Review comment:
   This will fail wrt HDFS, without having the parent directory fs.rename 
won't work. So the partitionPath directory has to be created before renaming it 
to final location. Please refer SPARK-23815

##
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##
@@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
+  val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+  dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+val fileName = stagingTaskFile.getName
+val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+val finalFile = new Path(partitionPath, fileName)
+if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+ throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
   In case of speculated attempts, its bound to come here as some other 
task might have renamed to the finalFile, instead of throwing an exception, may 
be we should log a warning with better message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-18 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r410825389
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -157,3 +161,49 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 }
+
+private class DetectDynamicSpeculationCommitProtocol(
+jobId: String,
+path: String,
+dynamicPartitionOverwrite: Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+if (dynamicPartitionOverwrite) {
+  val partitionPathSet = dynamicStagingTaskFiles
+.map(taskFile => getDynamicPartitionPath(taskFile, taskContext))
+.map(_.toUri.getPath.stripPrefix(stagingDir.toUri.getPath +
+  Path.SEPARATOR))
+  assert(partitionPathSet.equals(partitionPaths))
+}
+super.commitTask(taskContext)
+  }
+}
+
+class PartitionedSpeculateWriteSuite extends QueryTest with SharedSparkSession 
{
+  import testImplicits._
+
+  override def sparkConf(): SparkConf = {
+super.sparkConf
+  .set(config.SPECULATION_MULTIPLIER, 0.0)
+  .set(config.SPECULATION_QUANTILE, 0.5)
+  .set(config.SPECULATION_ENABLED, true)
+  }
+
+  test("SPARK-27194 SPARK-29302: Fix the issue that for dynamic partition 
overwrite, a " +
+"task would conflict with its speculative task") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+  SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+classOf[DetectDynamicSpeculationCommitProtocol].getName) {
+  withTable("t") {
 
 Review comment:
   I was testing it locally, without the fix this test still passes. Can you 
please check once?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-07 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405023852
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
   committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+if (dynamicPartitionOverwrite) {
 
 Review comment:
   so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and 
how does dynamicPartitionOverwrite deal with that output? Can you please give 
an example? It seems there are 2 renames happening one due to 
SparkHadoopMapRedUtil.commitTask and then with the code block inside 
dynamicPartitionOverwrite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-07 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r404983142
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
 
 Review comment:
   so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and 
how does dynamicPartitionOverwrite deal with that output? Can you please give 
an example? It seems there are 2 renames happening one due to 
SparkHadoopMapRedUtil.commitTask and then with the code block inside 
dynamicPartitionOverwrite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-07 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r404983142
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
 
 Review comment:
   so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and 
how does dynamicPartitionOverwrite deal with that output? Can you please give 
an example? It seems there are 2 renames happening one due to 
SparkHadoopMapRedUtil.commitTask and then with the code block inside 
dynamicPartitionOverwrite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-07 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r404983142
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
 committer.setupTask(taskContext)
 addedAbsPathFiles = mutable.Map[String, String]()
 partitionPaths = mutable.Set[String]()
+dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
 val attemptId = taskContext.getTaskAttemptID
 logTrace(s"Commit task ${attemptId}")
 SparkHadoopMapRedUtil.commitTask(
 
 Review comment:
   so where does SparkHadoopMapRedUtil.commitTask moves the data from and to 
and how does dynamicPartitionOverwrite deal with that output? It seems there 
are 2 renames happening one due to SparkHadoopMapRedUtil.commitTask and then 
with the code block inside dynamicPartitionOverwrite


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-05 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r403841433
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite && dir.isDefined)
 
 Review comment:
   already these are private defs do we still need assert?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #26339: [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative t

2020-04-05 Thread GitBox
venkata91 commented on a change in pull request #26339: 
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition 
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r403841559
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,6 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get staging path for a task with dynamicPartitionOverwrite=true.
+   */
+  private def dynamicStagingTaskPath(dir: Option[String], taskContext: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite && dir.isDefined)
+val attemptID = taskContext.getTaskAttemptID.getId
+new Path(stagingDir, s"${dir.get}-${attemptID}")
+  }
+
+  /**
+   * Tracks the staging task files with dynamicPartitionOverwrite=true.
+   */
+  @transient private var dynamicStagingTaskFiles: mutable.Set[Path] = null
+
+  /**
+   * Get responding partition path for a task with 
dynamicPartitionOverwrite=true.
+   */
+  private def getDynamicPartitionPath(stagingTaskFile: Path, context: 
TaskAttemptContext): Path = {
+assert(dynamicPartitionOverwrite)
 
 Review comment:
   Same here, do we need assert?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org