cloud-fan commented on a change in pull request #33002:
URL: https://github.com/apache/spark/pull/33002#discussion_r655506168
##########
File path:
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
##########
@@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
- val file = new Path(parent, getFilename(taskContext, ext))
+ val file = new Path(parent, getFilename(ext))
Review comment:
I'm not sure if this change affects this new committer, but I think it
should be a positive change. @steveloughran
##########
File path:
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
##########
@@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
- val file = new Path(parent, getFilename(taskContext, ext))
+ val file = new Path(parent, getFilename(ext))
Review comment:
I'm not sure if this change affects this new committer, but I think it
should be a positive change. The file name now use task attempt id instead of
partition id, which is "more unique".
@steveloughran
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
tmpOutputPath
}
- protected def getFilename(taskContext: TaskAttemptContext, ext: String):
String = {
- // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
- // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ protected def getFilename(ext: String): String = {
+ // Use the Spark task attempt ID which is unique within the write job, so
that file writes never
+ // collide if the file name also includes job ID. The Hadoop task id is
equivalent to Spark's
+ // partitionId, which is not unique within the write job, for cases like
task retry or
+ // speculative tasks.
+ // NOTE: this is not necessary for certain Hadoop output committers, as
they will create a
+ // unique staging directory for each task attempt, so we don't need to
worry about file name
+ // collision between different task attempts, and using Hadoop task
ID/Spark partition ID is
+ // also fine. For extra safety and consistency with the streaming side, we
always use the
+ // Spark task attempt ID here.
+ val taskId = TaskContext.get.taskAttemptId()
+ // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the taskId, so if we have more than
100000 tasks,
// the file name is fine and won't overflow.
- val split = taskContext.getTaskAttemptID.getTaskID.getId
- f"part-$split%05d-$jobId$ext"
+ f"part-$taskId%05d-$jobId$ext"
Review comment:
A more aggressive way is to simply use a fresh UUID here, but I'm not
sure if that's better. cc @zsxwing
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
##########
@@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path:
String)
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String):
String = {
- // The file name looks like
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
- // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ // Use the Spark task attempt ID which is unique within the write job, so
that file writes never
+ // collide if the file name also includes job ID. The Hadoop task id is
equivalent to Spark's
+ // partitionId, which is not unique within the write job, for cases like
task retry or
+ // speculative tasks.
+ val taskId = TaskContext.get.taskAttemptId()
+ // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the taskId, so if we have more than
100000 tasks,
// the file name is fine and won't overflow.
- val split = taskContext.getTaskAttemptID.getTaskID.getId
- val uuid = UUID.randomUUID.toString
- val filename = f"part-$split%05d-$uuid$ext"
Review comment:
Currently it does so to avoid file name collision, but I think it's
overkill and we can use "task attempt id + job id" to avoid name collision as
well, which is more consistent with the batch side.
It may also be useful to include the job id in the file name like the batch
side does, so that people can see which files were written by the same job.
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
tmpOutputPath
}
- protected def getFilename(taskContext: TaskAttemptContext, ext: String):
String = {
- // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
- // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ protected def getFilename(ext: String): String = {
+ // Use the Spark task attempt ID which is unique within the write job, so
that file writes never
+ // collide if the file name also includes job ID. The Hadoop task id is
equivalent to Spark's
+ // partitionId, which is not unique within the write job, for cases like
task retry or
+ // speculative tasks.
+ // NOTE: this is not necessary for certain Hadoop output committers, as
they will create a
+ // unique staging directory for each task attempt, so we don't need to
worry about file name
+ // collision between different task attempts, and using Hadoop task
ID/Spark partition ID is
+ // also fine. For extra safety and consistency with the streaming side, we
always use the
+ // Spark task attempt ID here.
+ val taskId = TaskContext.get.taskAttemptId()
+ // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the taskId, so if we have more than
100000 tasks,
// the file name is fine and won't overflow.
- val split = taskContext.getTaskAttemptID.getTaskID.getId
- f"part-$split%05d-$jobId$ext"
+ f"part-$taskId%05d-$jobId$ext"
Review comment:
the value will be very different. For one query, the partition id always
starts with 0. But task attempt id is unique within a spark application and
won't be reset for a new query.
If we do want to keep the `part-00000` prefix for some reason, we can also
apply the naming rule from streaming to batch. I don't know who will care about
the final naming. The commit protocols I'm aware of only care about file
listing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]