steveloughran commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r657429293



##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
     tmpOutputPath
   }
 
-  protected def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+  protected def getFilename(ext: String): String = {
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    // NOTE: this is not necessary for certain Hadoop output committers, as 
they will create a
+    // unique staging directory for each task attempt, so we don't need to 
worry about file name
+    // collision between different task attempts, and using Hadoop task 
ID/Spark partition ID is
+    // also fine. For extra safety and consistency with the streaming side, we 
always use the
+    // Spark task attempt ID here.
+    val taskId = TaskContext.get.taskAttemptId()
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the taskId, so if we have more than 
100000 tasks,
     // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    f"part-$split%05d-$jobId$ext"
+    f"part-$taskId%05d-$jobId$ext"

Review comment:
       part # is handy for a bit of blame assignment. FWIW the name "part" can 
be configured in ` "mapreduce.output.basename"`. Don't know if anyone does.
   
   Now, the v2 committer whose lack of task commit idempotency is well known is 
only going be able to recover from a failure partway through task attempt 
commit if the second attempt creates files with the same name. This should not 
be a barrier to having better names as, well, it's still broken.
   
   > But task attempt id is unique within a spark application and won't be 
reset for a new query.
   
   this true? I think really need to understand differences between spark job, 
task ID and attempt IDs and the YARN ones, which as we know, have had duplicate 
job IDs until SPARK-33402.
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def newTaskTempFile(
       taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
-    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    val taskId = TaskContext.get.taskAttemptId()

Review comment:
       Specifically: Hadoop Task ID MUST be the same for all task attempts, so 
that committers can commit the output of more than one task attempt by renaming 
the the Task Attempt dir to output/_temporary/jobAttempt/taskID  ; as only one 
task commit can do this (Assuming fs has atomic rename; google GCS doesn't), 
you get unique output. My WiP manifest committer creates a JSON manifest with 
task ID in the filename for the same reason: only one file can be committed by 
file rename (Atomic on GCS as well as azure).

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
##########
@@ -147,9 +147,13 @@ class FileStreamSink(
     if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
       logInfo(s"Skipping already committed batch $batchId")
     } else {
+      // To avoid file name collision, we should generate a new job ID for 
every write job, instead
+      // of using batchId, as we may use the same batchId to write files 
again, if the streaming job
+      // fails and we restore from the checkpoint.
+      val jobId = java.util.UUID.randomUUID().toString

Review comment:
       One change of SPARK-33402 was including some timestamp/version info. 
That's potentially quite handy later just to see when things were created/order

##########
File path: 
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
##########
@@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
     val parent = dir.map {
       d => new Path(workDir, d)
     }.getOrElse(workDir)
-    val file = new Path(parent, getFilename(taskContext, ext))
+    val file = new Path(parent, getFilename(ext))

Review comment:
       Commit protocols MUST NOT contain any assumptions about filenames. It 
would be silly.
   
   Well, almost not. try creating a file with .pending or .pendingset in the 
magic committer and it'd be very confused. (Maybe we should change that to 
something really obscure...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to