GitHub user mccheah opened a pull request:
https://github.com/apache/spark/pull/4155
[SPARK-4879] Use the Spark driver to authorize Hadoop commits.
This is a version of https://github.com/apache/spark/pull/4066/ which is up
to date with master and has unit tests.
Previously, SparkHadoopWriter always committed its tasks without question.
The problem is that when speculation is turned on, sometimes this can result in
multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to
speculation, the original task is not killed and may eventually commit as well.
This can cause strange race conditions where multiple tasks that commit
interfere with each other, with the result being that some partition files are
actually lost entirely. For more context on these kinds of scenarios, see the
aforementioned JIRA ticket.
In Hadoop MapReduce jobs, the application master is a central coordinator
that authorizes whether or not any given task can commit. Before a task commits
its output, it queries the application master as to whether or not such a
commit is safe, and the application master does bookkeeping as tasks are
requesting commits. Duplicate tasks that would write to files that were already
written to from other tasks are prohibited from committing.
This patch emulates that functionality - the crucial missing component was
a central arbitrator, which is now a module called the OutputCommitCoordinator.
The coordinator lives on the driver and the executors can obtain a reference to
this actor and request its permission to commit. As tasks commit and are
reported as completed successfully or unsuccessfully by the DAGScheduler, the
commit coordinator is informed of the task completion events as well to update
its internal state.
Future work includes more rigorous unit testing and extra optimizations
should this patch cause a performance regression. It is unclear what the
overall cost of communicating back to the driver on every hadoop-committing
task will be.
cc @MingyuKim @JoshRosen @ash211
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mccheah/spark commit-coordination
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/4155.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4155
----
commit 6e6f748561c5c3b8fa08796d39553b83216a2f29
Author: mcheah <[email protected]>
Date: 2015-01-21T18:24:41Z
[SPARK-4879] Use the Spark driver to authorize Hadoop commits.
Previously, SparkHadoopWriter always committed its tasks without
question. The problem is that when speculation is turned on, sometimes
this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to
speculation, the original task is not killed and may eventually commit
as well.
This can cause strange race conditions where multiple tasks that commit
interfere with each other, with the result being that some partition
files are actually lost entirely. For more context on these kinds of
scenarios, see the aforementioned JIRA ticket.
In Hadoop MapReduce jobs, the application master is a central
coordinator that authorizes whether or not any given task can commit.
Before a task commits its output, it queries the application master as
to whether or not such a commit is safe, and the application master does
bookeeping as tasks are requesting commits. Duplicate tasks that would
write to files that were already written to from other tasks are
prohibited from committing.
This patch emulates that functionality - the crucial missing component was
a central arbitrator, which is now a module called the
OutputCommitCoordinator.
The coordinator lives on the driver and the executors can obtain a reference
to this actor and request its permission to commit. As tasks commit and are
reported as completed successfully or unsuccessfully by the DAGScheduler,
the commit coordinator is informed of the task completion events as well
to update its internal state.
commit bc80770fea44474aba5e1ece57968e7c8c311f85
Author: mcheah <[email protected]>
Date: 2015-01-22T01:31:39Z
Unit tests for OutputCommitCoordinator
commit c9decc68377db246e720afe802a91852b043e364
Author: mcheah <[email protected]>
Date: 2015-01-22T01:34:05Z
Scalastyle fixes
commit 6b543bae5d9a6ea27408a57663986d3947ea7ab1
Author: mcheah <[email protected]>
Date: 2015-01-22T01:52:14Z
Removing redundant accumulator in unit test
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]