cloud-fan commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r657620317
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path:
String)
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String):
String = {
- // The file name looks like
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
- // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ // Use the Spark task attempt ID which is unique within the write job, so
that file writes never
+ // collide if the file name also includes job ID. The Hadoop task id is
equivalent to Spark's
+ // partitionId, which is not unique within the write job, for cases like
task retry or
+ // speculative tasks.
+ val taskId = TaskContext.get.taskAttemptId()
Review comment:
> Hadoop Task ID MUST be the same for all task attempts
This doesn't change:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L268
This PR is only to unify the file name generated by the builtin Spark file
commit protocol, and doesn't change anything in Hadoop Job/Task setting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]