cloud-fan commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r657620317



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def newTaskTempFile(
       taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
-    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // Use the Spark task attempt ID which is unique within the write job, so 
that file writes never
+    // collide if the file name also includes job ID. The Hadoop task id is 
equivalent to Spark's
+    // partitionId, which is not unique within the write job, for cases like 
task retry or
+    // speculative tasks.
+    val taskId = TaskContext.get.taskAttemptId()

Review comment:
       > Hadoop Task ID MUST be the same for all task attempts
   
   This doesn't change: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L268
   
   This PR is only to unify the file name generated by the builtin Spark file 
commit protocol, and doesn't change anything Hadoop Job/Task setting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to