GitHub user andrewor14 opened a pull request:
https://github.com/apache/spark/pull/10717
[SPARK-10620][WIP] Migrate TaskMetrics to accumulators
There exist two mechanisms to pass metrics from executors to drivers:
accumulators and `TaskMetrics`. Currently we send both things to the driver
using two separate code paths. This is an unnecessary maintenance burden and
makes the code more difficult to follow.
This patch proposes that we send only accumulators to the driver.
Additionally, it reimplements `TaskMetrics` using accumulators such that the
new `TaskMetrics` serves mainly as a syntactic sugar to increment and access
the values of the underlying accumulators. It migrates the rest of the metrics
to adopt the code path already used by the existing `PEAK_EXECUTION_MEMORY`.
While an effort has been made to preserve as much of the public API as
possible, there were a few known breaking `@DeveloperApi` changes that would be
very awkward to maintain. These are:
- `TaskMetrics#hostname` field was removed; there were no consumers except
the event log
- `ExceptionFailure#taskMetrics` field was replaced with `accumUpdates`
- `SparkListenerExecutorMetricsUpdate#taskMetrics` field was replaced with
`accumUpdates`
This is WIP because I would like to add tests for some of the intricate
cases that I ran into while implementing this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/andrewor14/spark task-metrics-to-accums
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/10717.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #10717
----
commit 8d5765751c500de5fb699100d5b10eebbcab4e5e
Author: Andrew Or <[email protected]>
Date: 2015-12-30T02:27:27Z
Remove unused methods or replace them
There are a bunch of decrement X methods that were not used.
Also, there are a few set X methods that could have easily just
been increment X. The latter change is more in line with
accumulators.
commit 1ad286813b758fda992c3d85406a7a628c39d1f7
Author: Andrew Or <[email protected]>
Date: 2016-01-04T21:34:42Z
Implement initial framework to migrate metrics to accums
This commit uses the existing PEAK_EXECUTION_MEMORY mechanism
to bring a few other fields in TaskMetrics to use accumulators.
commit a4ca6b2de82dac5f255cd40ace83c77b508ac43d
Author: Andrew Or <[email protected]>
Date: 2016-01-04T21:54:05Z
Migrate a few more easy metrics
commit 373898e7bf7f887fc008638625072ed1b68d8359
Author: Andrew Or <[email protected]>
Date: 2016-01-04T23:39:37Z
ShuffleReadMetrics + namespacing accumulators
This commit ports ShuffleReadMetrics to use accumulators,
preserving as much of the existing semantics as possible.
It also introduces a nicer way to organize all the internal
accumulators by namespacing them.
commit e74632c3e0ed6b20f52f112a0cfdad27e036a827
Author: Andrew Or <[email protected]>
Date: 2016-01-05T21:01:08Z
General code cleanup
commit 7e74bf38ac13e6eb148b15dd95e4241de1f7c118
Author: Andrew Or <[email protected]>
Date: 2016-01-05T21:47:24Z
ShuffleWriteMetrics
commit 396088d1fff77dd17d280ba3297c4b18d142b99f
Author: Andrew Or <[email protected]>
Date: 2016-01-05T22:17:51Z
OutputMetrics
commit 0404e3e8f291f6cb8b007fb09d252ee2bb513308
Author: Andrew Or <[email protected]>
Date: 2016-01-05T23:52:31Z
InputMetrics
This commit was a little tricky because it ripped the bytes read
callback from TaskMetrics and related classes. It does change
behavior in the sense that now we periodically update the number
of bytes read (every 1000 records) instead of doing it every time
we send an executor heartbeat. The advantage here is code
simplicity.
commit 17becb05aa347586fe07fd452e288479c7649e82
Author: Andrew Or <[email protected]>
Date: 2016-01-06T00:06:31Z
Fix JsonProtocol + JsonProtocolSuite
commit 809a93ac3b4961e35bfb180fa1e46bc36296153c
Author: Andrew Or <[email protected]>
Date: 2016-01-06T00:25:22Z
Fix tests where TaskMetrics had no accumulators
Tests are still failing as of this commit. E.g. SortShuffleSuite.
commit 32ba9e3b78f679261e36b4ea095733a14b30fd7b
Author: Andrew Or <[email protected]>
Date: 2016-01-06T00:29:18Z
Rename a few shuffle write metrics for consistency
commit 78fb33e3b7714c3ffff932434000780d17835c24
Author: Andrew Or <[email protected]>
Date: 2016-01-06T02:06:10Z
Fix metrics in local mode (tests)
Tests were previously failing because we end up double counting
metrics in local mode. This is because each TaskContext shares
the same list of accumulators, so they end up updating the metrics
on top of each other. The fix is to ensure TaskContext clears any
existing values on the accumulators before passing them on.
commit 8117898d585d62e536c7c87f56bc545118cc97b1
Author: Andrew Or <[email protected]>
Date: 2016-01-06T02:42:25Z
Fix "harmless" exception in peak memory tests
The exception was harmless because it didn't actually fail the
test. However, the test harness was actually badly written. We
used to always assume that the first job will have an ID of 0,
but there could very well be other tests sharing the same
SparkContext. This is now fixed and we no longer see the exception.
As of this commit, all known test failures have been fixed.
I'm sure there will be more...
commit 6bd9c0a1caaac38be75f4c8269225a53c75d6509
Author: Andrew Or <[email protected]>
Date: 2016-01-06T19:13:07Z
Simplify internal accumulator update mechanism
Instead of passing in a callback, we can just return the
accumulator values directly, which we have. "We" here refers to
TaskMetrics.
commit ed293282118c414e6d5c405a9c2222616ce9d3bd
Author: Andrew Or <[email protected]>
Date: 2016-01-06T21:43:24Z
Fix tests
commit 20119126a26420b30c4bba417d2c04aaffac305a
Author: Andrew Or <[email protected]>
Date: 2016-01-06T23:19:56Z
Clean up
This commit addresses outstanding TODO's and makes the deprecated
APIs DeveloperApi instead. This allows us to deal with how to
do the deprecation properly later. This commit also reverts a few
unnecessary changes to reduce the size of the diff.
commit c3de4f0c8e5dcfbfe4dbddad293c4dc2a8fddaf4
Author: Andrew Or <[email protected]>
Date: 2016-01-06T23:33:40Z
Merge branch 'master' of github.com:apache/spark into task-metrics-to-accums
Conflicts:
core/src/main/scala/org/apache/spark/TaskContextImpl.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
commit 9222f11d2b838b3bd61d62396719e44cb728bf66
Author: Andrew Or <[email protected]>
Date: 2016-01-07T03:55:50Z
Do not send TaskMetrics from executors to driver
Instead, send only accumulator updates. As of this commit
TaskMetrics is only used as a syntactic sugar on the executor
side to modify accumulator values by names. Now we no longer
send the same thing in two different codepaths.
Now that we never send TaskMetrics from executors to the driver,
we also never send accumulators that way. Then we can revert some
of the accumulator changes.
commit 2add53f8f83eeec9dd9338a18c32121ddded8afc
Author: Andrew Or <[email protected]>
Date: 2016-01-07T04:08:14Z
Restore accumulator serialization semantics
In the previous commit, we made accumulator communication one-way
again, which is the same as before this patch, so we restored all
the semantics involved in serializing accumulators as before.
Note: tests are still failing because of a duplicate accumulator
name in some SQL things. Run `DataFrameCallbackSuite` for more
detail.
commit afe957c5f28a1c24cd49194e60fb309e301aa427
Author: Andrew Or <[email protected]>
Date: 2016-01-07T21:06:12Z
Fix semantics of accumulators when tasks fail
Currently we still get values for tasks that fail. We should
keep this semantics in the new accumulator updates as well.
commit c7240f3dd4490e81905131841596eee72be72898
Author: Andrew Or <[email protected]>
Date: 2016-01-07T21:38:55Z
Fix a few more tests
There are a few places where we passed in empty internal
accumulators to TaskContextImpl, so the TaskMetrics creation
would fail. These are now fixed.
commit fa086c36647520f6e04622003a8e795bbffd98ad
Author: Andrew Or <[email protected]>
Date: 2016-01-08T01:24:35Z
Fix SQL UI
Before this commit the SQL UI would not display any accumulators.
This is because it is powered by the SQLListener, which reads
accumulators from TaskMetrics. However, we did not update the
accumulator values before posting the TaskMetrics, so the UI
never saw the updates from the tasks.
This commit also fixes a few related test failures.
commit 361442e30ae5938a23e93a276feae1112ce816a0
Author: Andrew Or <[email protected]>
Date: 2016-01-08T02:52:28Z
Clean up: lift odd unique name requirement
Now internal accumulators no longer need to have unique names.
This was an unnecessary hack for the SQL accumulators that can
be reverted through some clean ups.
commit 5aa6aa1bb60d4dead372b307efb1b3b725b4e409
Author: Andrew Or <[email protected]>
Date: 2016-01-08T18:59:59Z
Move smaller metrics classes to their own files
for readability.
commit 7118be5c16e98bcae7b5effa1ef462dc8f780655
Author: Andrew Or <[email protected]>
Date: 2016-01-08T19:11:07Z
Fix SQLQuerySuite
A few bugs:
(1) In Executor.scala, we updated TaskMetrics after collecting the
accumulator values. We should do it the other order.
(2) The test utility method of verifying whether peak execution
memory is set imposed this requirement on every single job run
in the test body. This does not apply for SQL's external sort,
however, because one of the jobs does a sample and so does not
update peak execution memory.
(3) We were getting accumulators from executors that were not
registered on the driver. Not exactly sure what the cause is but
it could very well have to do with GC on the driver since we use
weak references there. We shouldn't crash the scheduler if this
happens.
commit 176e91d0abfaed3d8741821787a3732b0b9f3688
Author: Andrew Or <[email protected]>
Date: 2016-01-08T21:22:21Z
Remove unused hostname from TaskMetrics
commit 7939e1c74d9dd62e383c71c72d9b353c3f6d8dfc
Author: Andrew Or <[email protected]>
Date: 2016-01-08T22:18:17Z
Reinitialize OutputMetrics et al during reconstruction
Such that downstream listeners can access their values.
This commit also generalizes the internal accumulator type from
Long to anything, since we need to store the read and write
methods of InputMetrics and OutputMetrics respectively.
commit c029f62be0628af904fe1a1ad795e8208f55e51d
Author: Andrew Or <[email protected]>
Date: 2016-01-08T23:52:02Z
Fix *ShuffleSuite
This fixes a bug where when we reconstruct TaskMetrics we just
pass in mutable accumulators, such that when new tasks come in
they change the values of the old tasks. A more subtle bug here
is that we were passing in the accumulated values instead of the
local task values. Both are now fixed.
TODO: write a test for all of these please.
commit b3c51dd3930f8c304f0fc67fc476d95d066f9a1e
Author: Andrew Or <[email protected]>
Date: 2016-01-09T01:45:48Z
Fix DAGSchedulerSuite
The fake accumulator values should no longer all be Longs. Ugh.
commit d531f3fff759dbfca6f24eb41ec37428ea055785
Author: Andrew Or <[email protected]>
Date: 2016-01-09T06:51:17Z
Simplify accumulator update code a little
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]