[
https://issues.apache.org/jira/browse/MAPREDUCE-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765979#comment-16765979
]
Steve Loughran commented on MAPREDUCE-7185:
-------------------------------------------
Igor, which cloud infra was this? S3 has more fundamental issues than rename
time, which is why we have the [zero rename
committer|https://github.com/steveloughran/zero-rename-committer/releases]
there. (I'm assuming GCS here, right?)
At the same time, yes, rename cost is high, especially on a store where time to
rename
We're only just starting to play with futures in the production source of
hadoop, where the fact that IOEs have to be caught and wrapped is a fundamental
problem with the java language
# Can you use CompleteableFuture<> over the simpler Future<>, it chains better
# see org.apache.hadoop.util.LambdaUtils for some support
# and org.apache.hadoop.fs.impl.FutureIOSupport for work on wrapping and
unwrapping IOEs, with WrappedException being exclusively for IOEs, so
straightforward to unwrap back to an IOE.
# the default # of threads should be 1, so on HDFS the behaviour is the same.
Cloud deployments with a consistent store can do more.
regarding the patch -1 as is
* IOEs must be extracted from the ExecutionException, rather than wrapped in a
generic "IOException" which loses the underlying failure code (so making it
hard to callers to interpret)
* We're going to change failure modes such that more than one may fail
simultaneously,
As Yetus says, "The patch doesn't appear to include any new or modified tests."
We'll need those, including some to help test failures.
Test wise
* what happens is if the thread count is negative? It should be OK
* what if the file length is 0? In that case, there isn't anything to rename at
all. Again, my read of the code implies that's fine.
* You could see about adding something for S3A and AWS; the S3A tests could be
skipped when S3Guard is disabled.
* a subclassable committer test in mapreduce-examples or mapreduce-client is
going to be needed here. Something for both the v1 and v2 algorithms.
* You effectively get that with HADOOP-16058: Terasort which can be used
against cloud infras. One for ABFS with a thread count > 1 would be nice
* In org.apache.hadoop.fs.s3a.commit.staging.TestStagingCommitter we've got
tests which use a mock FS to simulate and validate state. That could be used
for this purpose too (i.e. add a new test for the classic committer, now with
threads > 1); check it works, simulate failures.
Minor details
* check your import ordering
* add some javadocs for the new options
* And something in the MR docs + mapreduce-default.xml. Secret settings benefit
nobody except those who read the code line-by-line
Having done the work in HADOOP-13786, the code in that base FileOutputCommitter
scares me, and how the two commit algorithms are intermingled in something
co-recursive is part of the reason. Its why I added a whole new plugin point
for the object store committers.
h3. If you are going to go near that FileOutputCommitter, I'm going to want to
see as rigorous a proof of correctness and you can come up with.
V2 commit with >1 task writing to the same paths is the key risk ppoint: task A
writes to /dest/_temp/_job1_att_1/task_a/data1 but task be writes
/dest/_temp/_job1_att_1/task_b/data1/file2 ; that file to commit, data1 is
both a file from task A and a dir from task B. Things have to fail in
meaningful ways there, and a generic "IOExecption" doesn't qualify.
That zero-output-committer doc is a best effort there —and its definition of
"correctness" is the one I'll be reviewing this patch on. I think you could
take that and cover this parallel-rename algorithm.
*important* this isn't me trying to dissuade you —I agree, this would be great
for object stores with consistent listings but O(1) file renames, and guess
what: the v2 algorithm effectively works a file at a time to. It's just that
having this algorithm work correctly is critical to *everything* generating
correct output.
Two extra points
# we do now have a plugin point to slot in new commit algorithms underneath any
FileOutputFormat which doesn't subclass getOutputCommitter(); you do have the
option of adding a whole new committer for your store, which I will worry
slightly less about. If any change proposed to FileOutputCommitter downgrades
the normal HDFS output algorithm in any way (including loss of exception info
on failures), I'm going to say "do it in its own committer"
# having done the committer work, I think the v2 commit algorithm doesn't work
properly: it handles failures badly, in particular can't cope with partial
failure of a committer during the abort phase —and most executors, including
Spark, aren't prepared for that outcome. I don't expect this one to be any
better here, and with the parallelisation you can argue that the failure window
against an object store is actually reduced. But I advocate using V1 for
safety, so make sure that doesn't downgrade, and as for v2, well, try not to
make it worse.
> Parallelize part files move in FileOutputCommitter
> --------------------------------------------------
>
> Key: MAPREDUCE-7185
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7185
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 3.2.0, 2.9.2
> Reporter: Igor Dvorzhak
> Assignee: Igor Dvorzhak
> Priority: Major
> Attachments: MAPREDUCE-7185.patch
>
>
> If map task outputs multiple files it could be slow to move them from temp
> directory to output directory in object stores (GCS, S3, etc).
> To improve performance we need to parallelize move of more than 1 file in
> FileOutputCommitter.
> Repro:
> Start spark-shell:
> {code}
> spark-shell --num-executors 2 --executor-memory 10G --executor-cores 4 --conf
> spark.dynamicAllocation.maxExecutors=2
> {code}
> From spark-shell:
> {code}
> val df = (1 to 10000).toList.toDF("value").withColumn("p", $"value" %
> 10).repartition(50)
> df.write.partitionBy("p").mode("overwrite").format("parquet").options(Map("path"
> -> s"gs://some/path")).saveAsTable("parquet_partitioned_bench")
> {code}
> With the fix execution time reduces from 130 seconds to 50 seconds.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]