viirya commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r657691597
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path:
String)
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String):
String = {
- // The file name looks like
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
- // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ // Use the Spark task attempt ID which is unique within the write job, so
that file writes never
+ // collide if the file name also includes job ID. The Hadoop task id is
equivalent to Spark's
+ // partitionId, which is not unique within the write job, for cases like
task retry or
+ // speculative tasks.
+ val taskId = TaskContext.get.taskAttemptId()
+ // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the taskId, so if we have more than
100000 tasks,
Review comment:
same, taskId -> taskAttemptId?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]