cloud-fan commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r657610207



##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
     tmpOutputPath
   }
 
-  protected def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+  protected def getFilename(ext: String): String = {
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    // NOTE: this is not necessary for certain Hadoop output committers, as 
they will create a
+    // unique staging directory for each task attempt, so we don't need to 
worry about file name
+    // collision between different task attempts, and using Hadoop task 
ID/Spark partition ID is
+    // also fine. For extra safety and consistency with the streaming side, we 
always use the
+    // Spark task attempt ID here.
+    val taskId = TaskContext.get.taskAttemptId()
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the taskId, so if we have more than 
100000 tasks,
     // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    f"part-$split%05d-$jobId$ext"
+    f"part-$taskId%05d-$jobId$ext"

Review comment:
       Spark has: job id -> stage id -> partition ID
   
   job id is simply a UUID
   stage id is an integer starting from 0, globally unique within the spark 
application
   partition ID is an integer starting from 0, unique within a stage
   
   Eash task does not only have partition ID, but also has attempt ID, which is 
an integer starting from 0, globally unique within the spark application. There 
is also an attempt number, which starts from 0 and increases by one for each 
attempt of this task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to