advancedxy commented on a change in pull request #25863: 
[WIP][SPARK-29037][CORE][SQL] For static partition overwrite, spark may give 
duplicate result.
URL: https://github.com/apache/spark/pull/25863#discussion_r327986135
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##########
 @@ -91,7 +97,40 @@ class HadoopMapReduceCommitProtocol(
    */
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * The staging root directory for InsertIntoHadoopFsRelation operation.
+   */
+  @transient private var insertStagingDir: Path = null
+
+  @transient private var stagingOutputPath: Path = null
+
+  /**
+   * Get the desired output path for the job.
+   */
+  protected def getOutputPath(context: TaskAttemptContext): Path = {
+    if (isInsertIntoHadoopFsRelation) {
+      val overwriteFlag = if (isOverwrite) "overwrite-" else "append-"
+      val insertStagingPath = ".spark-staging-" + overwriteFlag + 
staticPartitionKVs.size
+      insertStagingDir = new Path(path, insertStagingPath)
+      val appId = SparkEnv.get.conf.getAppId
+      val outputPath = new Path(path, Array(insertStagingPath,
+        getStaticPartitionPath(staticPartitionKVs), appId, 
jobId).mkString(File.separator))
+      
insertStagingDir.getFileSystem(context.getConfiguration).makeQualified(outputPath)
+      outputPath
+    } else {
+      new Path(path)
+    }
+  }
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
+    if (isInsertIntoHadoopFsRelation) {
+      stagingOutputPath = getOutputPath(context)
+      context.getConfiguration.set(FileOutputFormat.OUTDIR, 
stagingOutputPath.toString)
+      // Set file output committer to 2 implicitly, for that the task output 
would be
+      // committed to staging output path firstly, which is equivalent to 
algorithm 1.
+      
context.getConfiguration.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
 2)
 
 Review comment:
   I believe a warning log is needed here to notify user that we set committer 
algorithm version to 2.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to