cloud-fan commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r655506168



##########
File path: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
##########
@@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
     val parent = dir.map {
       d => new Path(workDir, d)
     }.getOrElse(workDir)
-    val file = new Path(parent, getFilename(taskContext, ext))
+    val file = new Path(parent, getFilename(ext))

Review comment:
       I'm not sure if this change affects this new committer, but I think it 
should be a positive change. @steveloughran 

##########
File path: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
##########
@@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
     val parent = dir.map {
       d => new Path(workDir, d)
     }.getOrElse(workDir)
-    val file = new Path(parent, getFilename(taskContext, ext))
+    val file = new Path(parent, getFilename(ext))

Review comment:
       I'm not sure if this change affects this new committer, but I think it 
should be a positive change. The file name now use task attempt id instead of 
partition id, which is "more unique".
   
    @steveloughran 

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
     tmpOutputPath
   }
 
-  protected def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+  protected def getFilename(ext: String): String = {
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    // NOTE: this is not necessary for certain Hadoop output committers, as 
they will create a
+    // unique staging directory for each task attempt, so we don't need to 
worry about file name
+    // collision between different task attempts, and using Hadoop task 
ID/Spark partition ID is
+    // also fine. For extra safety and consistency with the streaming side, we 
always use the
+    // Spark task attempt ID here.
+    val taskId = TaskContext.get.taskAttemptId()
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the taskId, so if we have more than 
100000 tasks,
     // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    f"part-$split%05d-$jobId$ext"
+    f"part-$taskId%05d-$jobId$ext"

Review comment:
       A more aggressive way is to simply use a fresh UUID here, but I'm not 
sure if that's better. cc @zsxwing 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def newTaskTempFile(
       taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
-    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    val taskId = TaskContext.get.taskAttemptId()
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the taskId, so if we have more than 
100000 tasks,
     // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    val uuid = UUID.randomUUID.toString
-    val filename = f"part-$split%05d-$uuid$ext"

Review comment:
       Currently it does so to avoid file name collision, but I think it's 
overkill and we can use "task attempt id + job id" to avoid name collision as 
well, which is more consistent with the batch side.
   
   It may also be useful to include the job id in the file name like the batch 
side does, so that people can see which files were written by the same job.

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
     tmpOutputPath
   }
 
-  protected def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+  protected def getFilename(ext: String): String = {
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    // NOTE: this is not necessary for certain Hadoop output committers, as 
they will create a
+    // unique staging directory for each task attempt, so we don't need to 
worry about file name
+    // collision between different task attempts, and using Hadoop task 
ID/Spark partition ID is
+    // also fine. For extra safety and consistency with the streaming side, we 
always use the
+    // Spark task attempt ID here.
+    val taskId = TaskContext.get.taskAttemptId()
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the taskId, so if we have more than 
100000 tasks,
     // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    f"part-$split%05d-$jobId$ext"
+    f"part-$taskId%05d-$jobId$ext"

Review comment:
       the value will be very different. For one query, the partition id always 
starts with 0. But task attempt id is unique within a spark application and 
won't be reset for a new query.
   
   If we do want to keep the `part-00000` prefix for some reason, we can also 
apply the naming rule from streaming to batch. I don't know who will care about 
the final naming. The commit protocols I'm aware of only care about file 
listing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to