[
https://issues.apache.org/jira/browse/MAPREDUCE-6823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16203466#comment-16203466
]
Steve Loughran commented on MAPREDUCE-6823:
-------------------------------------------
Patch 002
This is the mapreduce part of the latest HADOOP-13786 patch. It adds
* the notion of a committer factory which is given the job configuration and
destinatioon path, and returns the committer to use for this job.
# The ability to declare a committer factory for
{{org.apache.hadoop.mapreduce.lib.output.FileOutputFormat}} in the option
{{mapreduce.outputcommitter.factory.class}}
# The ability to register the default committer to use for different file
formats (searched for if {{mapreduce.outputcommitter.factory.class}} is unset.
This lets you define different committers for s3a, wasb, etc
# the default committer factory for creating, {{FileOutputCommitter}} instances
# Another committer factory for creating a named instance of
{{PathOutputCommitter}} (see MAPREDUCE-6956), though the property
{{mapreduce.outputcommitter.named.classname}}
# Static methods to make this straightforward to use
# {{FileOutputFormat}} wired up to use this factory mechanism.
# Tests for all this
As a result of this, you can now define a committer factory/committer class for
file output on an explicit per-job or implicit *per-destination* basis. As
these are subclasses of PathOutputCommitter, they don't have to go near the
complexity of FileOutputFormat and its existing ls/rename/merge strategy, which
is high performance and reliable for "real" filesystems, but not for object
stores or other destinations.
Because the factories are used in FileOutputFormat, all file output formats
which don't provide their own committer will get the feature too. Those which
do provide their own committer (ParquetOutputFormat) don't pick up this new
feature. It's left to applications to sort that out by calling the
{{PathOutputCommitterFactory}} directly (which I have been doing downstream)
The per-dest logic is used in HADOOP-13786 for committer factories, most
interesting being the "DynamicCommitterFactory" which chooses the actual
committer based on the settings of the destination bucket. Thats why the
complexity of config -> factory instance -> committer instance is important: it
offloads the more complex decision making to the factories. The per-fs-schema
logic allows us to declare different factories for different filesystems, and
the factories making the final call
{code}
mapreduce.outputcommitter.factory.scheme.s3a =
org.apache.hadoop.fs.s3a.commit.DynamicCommitterFactory
{code}
At the same time, jobs can override this with their own decisions, especially
with a simple factory which just instantiates a committer class from the
propery mapreduce.outputcommitter.named.classname. This gives the MRv2 output
formats the same feature that MRv1 has long had: the ability to declare a new
output committer for file outputs
{code}
mapreduce.outputcommitter.factory.class =
org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory
mapreduce.outputcommitter.named.classname = YOUR-COMMITTER-CLASSNAME
{code}
This does not have any impact on normal commit operations, as the default
factory creates FileOutputCommitters, as before. It merely offers the ability
to change committers & makes it easy to do this for specific filesystems.
This has now been tested all the way through Spark; with a change to its
parquet commit logic (SPARK-22217) and a special PathOutputCommitter, this
feature lets us write data direct to S3 through Hadoop MR and spark, with ORC,
Parquet and other formats. It is also lined up to support any other
committers/filesystem committers people write.
Testing: unit tests here, functional ITests in hadoop-aws, downstream tests
elsewhere.
Things to consider/refine
* Config option names?
* Best way to document. I've got an empty option in mapred-default.xml,
otherwise its all in the javadocs.
* Should the factory code all go into its own package, e.g.
{{org.apache.hadoop.mapreduce.lib.output.factory}}?
> FileOutputFormat to support configurable PathOutputCommitter factory
> --------------------------------------------------------------------
>
> Key: MAPREDUCE-6823
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6823
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: mrv2
> Affects Versions: 3.0.0-alpha2
> Environment: Targeting S3 as the output of work
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Attachments: HADOOP-13786-HADOOP-13345-001.patch,
> MAPREDUCE-6823-002.patch
>
>
> In HADOOP-13786 I'm adding a custom subclass for FileOutputFormat, one which
> can talk direct to the S3A Filesystem for more efficient operations, better
> failure modes, and, most critically, as part of HADOOP-13345, atomic commit
> of output. The normal committer relies on directory rename() being atomic for
> this; for S3 we don't have that luxury.
> To support a custom committer, we need to be able to tell FileOutputFormat
> (and implicitly, all subclasses which don't have their own custom committer),
> to use our new {{S3AOutputCommitter}}.
> I propose:
> # {{FileOutputFormat}} takes a factory to create committers.
> # The factory to take a URI and {{TaskAttemptContext}} and return a committer
> # the default implementation always returns a {{FileOutputCommitter}}
> # A configuration option allows a new factory to be named
> # An {{S3AOutputCommitterFactory}} to return a {{FileOutputCommitter}} or
> new {{S3AOutputCommitter}} depending upon the URI of the destination.
> Note that MRv1 already supports configurable committers; this is only the V2
> API
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]