[
https://issues.apache.org/jira/browse/MAPREDUCE-7029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331068#comment-16331068
]
Karthik Palaniappan commented on MAPREDUCE-7029:
------------------------------------------------
Directory rename is indeed not atomic – iirc GCS did not write a separate
output committer because the performance of rename is tolerable O(files) –
compared to S3's O(data-per-file * files).
Correct me if I'm wrong, but I don't think FileOutputCommiter actually requires
atomicity.
1) Task commit is already non-atomic: commitTask() calls mergePaths(), which is
essentially a recursive copy of the attempt directory, *not* an atomic rename
of the attempt directory. That being said, if the output files have the same
names across different task attempts (e.g. speculative execution), this is
still okay, as later tasks will just overwrite older task files with the same
contents.
2) Job commit is marked by a _SUCCESS file, so it's okay if the directory
rename is non-atomic.
That being said, I agree that in the long term, parts of the Hadoop ecosystem
that assume POSIX-ish directory semantics should have different implementations
for object stores. This is not limited to OutputCommitter.
> FileOutputCommitter is slow on filesystems lacking recursive delete
> -------------------------------------------------------------------
>
> Key: MAPREDUCE-7029
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7029
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 2.8.2
> Environment: - Google Cloud Storage (with the GCS connector:
> https://github.com/GoogleCloudPlatform/bigdata-interop/tree/master/gcs) for
> HCFS compatibility.
> - FileOutputCommitter algorithm v2.
> - Running on Google Compute Engine with Java 8, Debian 8, Hadoop 2.8.2, Spark
> 2.2.0.
> Reporter: Karthik Palaniappan
> Assignee: Karthik Palaniappan
> Priority: Minor
> Fix For: 3.1.0, 2.10.0
>
> Attachments: MAPREDUCE-7029-branch-2.004.patch,
> MAPREDUCE-7029-branch-2.005.patch, MAPREDUCE-7029-branch-2.005.patch,
> MAPREDUCE-7029.001.patch, MAPREDUCE-7029.002.patch, MAPREDUCE-7029.003.patch,
> MAPREDUCE-7029.004.patch, MAPREDUCE-7029.005.patch
>
>
> I ran a Spark job that outputs thousands of parquet files (aka there are
> thousands of reducers), and it hung for several minutes in the driver after
> all tasks were complete. Here is a very simple repro of the job (to be run in
> a spark-shell):
> {code:scala}
> spark.range(1L << 20).repartition(1 << 14).write.save("gs://some/path")
> {code}
> Spark actually calls into Mapreduce's FileOuputCommitter. Job committing
> (specifically cleanupJob()) recursively deletes the job temporary directory,
> which is something like "gs://some/path/_temporary". If I understand
> correctly, on HDFS, this would be O(1), but on GCS (and every HCFS I know),
> this requires a full file tree walk. Deleting tens of thousands of objects in
> GCS takes several minutes.
> I propose that commitTask() recursively deletes its the task attempt temp
> directory (something like "gs://some/path/_temporary/attempt1/task1"). On
> HDFS, this is O(1) per task, so this is very little overhead per task. On GCS
> (and other HCFSs), this adds parallelism for deleting the job temp directory.
> With the attached patch, the repro above went from taking ~10 minutes to
> taking ~5 minutes, and task time did not significantly change.
> Side note: I found this issue with Spark, but I assume it applies to a
> Mapreduce job with thousands of reducers as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]