[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765979#comment-16765979
 ] 

Steve Loughran commented on MAPREDUCE-7185:
-------------------------------------------

Igor, which cloud infra was this? S3 has more fundamental issues than rename 
time, which is why we have the [zero rename 
committer|https://github.com/steveloughran/zero-rename-committer/releases] 
there. (I'm assuming GCS here, right?)

At the same time, yes, rename cost is high, especially on a store where time to 
rename

We're only just starting to play with futures in the production source of 
hadoop, where the fact that IOEs have to be caught and wrapped is a fundamental 
problem with the java language

# Can you use CompleteableFuture<> over the simpler Future<>, it chains better
# see org.apache.hadoop.util.LambdaUtils for some support
# and org.apache.hadoop.fs.impl.FutureIOSupport for work on wrapping and 
unwrapping IOEs, with WrappedException being exclusively for IOEs, so 
straightforward to unwrap back to an IOE.
# the default # of threads should be 1, so on HDFS the behaviour is the same. 
Cloud deployments with a consistent store can do more.


regarding the patch -1 as is

 * IOEs must be extracted from the ExecutionException, rather than wrapped in a 
generic "IOException" which loses the underlying failure code (so making it 
hard to callers to interpret)
* We're going to change failure modes such that more than one may fail 
simultaneously, 



As Yetus says, "The patch doesn't appear to include any new or modified tests." 
We'll need those, including some to help test failures.


Test wise
* what happens is if the thread count is negative? It should be OK
* what if the file length is 0? In that case, there isn't anything to rename at 
all. Again, my read of the code implies that's fine.
* You could see about adding something for S3A and AWS; the S3A tests could be 
skipped when S3Guard is disabled. 
* a subclassable committer test in mapreduce-examples or mapreduce-client is 
going to be needed here. Something for both the v1 and v2 algorithms.
* You effectively get that with HADOOP-16058: Terasort which can be used 
against cloud infras. One for ABFS with a thread count > 1 would be nice
* In org.apache.hadoop.fs.s3a.commit.staging.TestStagingCommitter we've got 
tests which use a mock FS to simulate and validate state. That could be used 
for this purpose too (i.e. add a new test for the classic committer, now with 
threads > 1); check it works, simulate failures.

Minor details
* check your import ordering
* add some javadocs for the new options
* And something in the MR docs + mapreduce-default.xml. Secret settings benefit 
nobody except those who read the code line-by-line

Having done the work in HADOOP-13786, the code in that base FileOutputCommitter 
scares me, and how the two commit algorithms are intermingled in something 
co-recursive is part of the reason. Its why I added a whole new plugin point 
for the object store committers.

h3. If you are going to go near that FileOutputCommitter, I'm going to want to 
see as rigorous a proof of correctness and you can come up with.

V2 commit with >1 task writing to the same paths is the key risk ppoint: task A 
writes to /dest/_temp/_job1_att_1/task_a/data1 but task be writes 
/dest/_temp/_job1_att_1/task_b/data1/file2  ; that file to commit, data1 is 
both a file from task A and a dir from task B. Things have to fail in 
meaningful ways there, and a generic "IOExecption" doesn't qualify.

That zero-output-committer doc is a best effort there —and its definition of 
"correctness" is the one I'll be reviewing this patch on. I think you could 
take that and cover this parallel-rename algorithm.

*important* this isn't me trying to dissuade you —I agree, this would be great 
for object stores with consistent listings but O(1) file renames, and guess 
what: the v2 algorithm effectively works a file at a time to. It's just that 
having this algorithm work correctly is critical to *everything* generating 
correct output.

Two extra points

# we do now have a plugin point to slot in new commit algorithms underneath any 
FileOutputFormat which doesn't subclass getOutputCommitter(); you do have the 
option of adding a whole new committer for your store, which I will worry 
slightly less about. If any change proposed to FileOutputCommitter downgrades 
the normal HDFS output algorithm in any way (including loss of exception info 
on failures), I'm going to say "do it in its own committer"

# having done the committer work, I think the v2 commit algorithm doesn't work 
properly: it handles failures badly, in particular can't cope with partial 
failure of a committer during the abort phase —and most executors, including 
Spark, aren't prepared for that outcome. I don't expect this one to be any 
better here, and with the parallelisation you can argue that the failure window 
against an object store is actually reduced. But I advocate using V1 for 
safety, so make sure that doesn't downgrade, and as for v2, well, try not to 
make it worse.




> Parallelize part files move in FileOutputCommitter
> --------------------------------------------------
>
>                 Key: MAPREDUCE-7185
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7185
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.2.0, 2.9.2
>            Reporter: Igor Dvorzhak
>            Assignee: Igor Dvorzhak
>            Priority: Major
>         Attachments: MAPREDUCE-7185.patch
>
>
> If map task outputs multiple files it could be slow to move them from temp 
> directory to output directory in object stores (GCS, S3, etc).
> To improve performance we need to parallelize move of more than 1 file in 
> FileOutputCommitter.
> Repro:
>  Start spark-shell:
> {code}
> spark-shell --num-executors 2 --executor-memory 10G --executor-cores 4 --conf 
> spark.dynamicAllocation.maxExecutors=2
> {code}
> From spark-shell:
> {code}
> val df = (1 to 10000).toList.toDF("value").withColumn("p", $"value" % 
> 10).repartition(50)
> df.write.partitionBy("p").mode("overwrite").format("parquet").options(Map("path"
>  -> s"gs://some/path")).saveAsTable("parquet_partitioned_bench")
> {code}
> With the fix execution time reduces from 130 seconds to 50 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to