[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16203466#comment-16203466
 ] 

Steve Loughran commented on MAPREDUCE-6823:
-------------------------------------------

Patch 002

This is the mapreduce part of the latest HADOOP-13786 patch. It adds 

* the notion of a committer factory which is given the job configuration and 
destinatioon path, and returns the committer to use for this job.
# The ability to declare a committer factory for 
{{org.apache.hadoop.mapreduce.lib.output.FileOutputFormat}} in the option 
{{mapreduce.outputcommitter.factory.class}}
# The ability to register the default committer to use for different file 
formats (searched for if  {{mapreduce.outputcommitter.factory.class}} is unset. 
This lets you define different committers for s3a, wasb, etc
# the default committer factory for creating, {{FileOutputCommitter}} instances
# Another committer factory for creating a named instance of 
{{PathOutputCommitter}} (see MAPREDUCE-6956), though the property 
{{mapreduce.outputcommitter.named.classname}}
# Static methods to make this straightforward to use
# {{FileOutputFormat}} wired up to use this factory mechanism.
# Tests for all this

As a result of this, you can now define a committer factory/committer class for 
file output on an explicit per-job or implicit *per-destination* basis. As 
these are subclasses of PathOutputCommitter, they don't have to go near the 
complexity of FileOutputFormat and its existing ls/rename/merge strategy, which 
is high performance and reliable for "real" filesystems, but not for object 
stores or other destinations.
Because the factories are used in FileOutputFormat, all file output formats 
which don't provide their own committer will get the feature too. Those which 
do provide their own committer (ParquetOutputFormat) don't pick up this new 
feature. It's left to applications to sort that out by calling the 
{{PathOutputCommitterFactory}} directly (which I have been doing downstream)

The per-dest logic is used in HADOOP-13786 for committer factories, most 
interesting being the "DynamicCommitterFactory" which chooses the actual 
committer based on the settings of the destination bucket. Thats why the 
complexity of config -> factory instance -> committer instance is important: it 
offloads the more complex decision making to the factories. The per-fs-schema 
logic allows us to declare different factories for different filesystems, and 
the factories making the final call

{code}
mapreduce.outputcommitter.factory.scheme.s3a  = 
org.apache.hadoop.fs.s3a.commit.DynamicCommitterFactory
{code}

At the same time, jobs can override this with their own decisions, especially 
with a simple factory which just instantiates a committer class from the 
propery mapreduce.outputcommitter.named.classname. This gives the MRv2 output 
formats the same feature that MRv1 has long had: the ability to declare a new 
output committer for file outputs

{code}
mapreduce.outputcommitter.factory.class = 
org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory
mapreduce.outputcommitter.named.classname = YOUR-COMMITTER-CLASSNAME
{code}

This does not have any impact on normal commit operations, as the default 
factory creates FileOutputCommitters, as before. It merely offers the ability 
to change committers & makes it easy to do this for specific filesystems.

This has now been tested all the way through Spark; with a change to its 
parquet commit logic (SPARK-22217) and a special PathOutputCommitter, this 
feature lets us write data direct to S3 through Hadoop MR and spark, with ORC, 
Parquet and other formats. It is also lined up to support any other 
committers/filesystem committers people write.

Testing: unit tests here, functional ITests in hadoop-aws, downstream tests 
elsewhere. 

Things to consider/refine

* Config option names?
* Best way to document. I've got an empty option in mapred-default.xml, 
otherwise its all in the javadocs.
* Should the factory code all go into its own package, e.g. 
{{org.apache.hadoop.mapreduce.lib.output.factory}}?

> FileOutputFormat to support configurable PathOutputCommitter factory
> --------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6823
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6823
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv2
>    Affects Versions: 3.0.0-alpha2
>         Environment: Targeting S3 as the output of work
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>         Attachments: HADOOP-13786-HADOOP-13345-001.patch, 
> MAPREDUCE-6823-002.patch
>
>
> In HADOOP-13786 I'm adding a custom subclass for FileOutputFormat, one which 
> can talk direct to the S3A Filesystem for more efficient operations, better 
> failure modes, and, most critically, as part of HADOOP-13345, atomic commit 
> of output. The normal committer relies on directory rename() being atomic for 
> this; for S3 we don't have that luxury.
> To support a custom committer, we need to be able to tell FileOutputFormat 
> (and implicitly, all subclasses which don't have their own custom committer), 
> to use our new {{S3AOutputCommitter}}.
> I propose: 
> # {{FileOutputFormat}} takes a factory to create committers.
> # The factory to take a URI and {{TaskAttemptContext}} and return a committer
> # the default implementation always returns a {{FileOutputCommitter}}
> # A configuration option allows a new factory to be named
> # An {{S3AOutputCommitterFactory}} to return a  {{FileOutputCommitter}} or 
> new {{S3AOutputCommitter}} depending upon the URI of the destination.
> Note that MRv1 already supports configurable committers; this is only the V2 
> API



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to