GitHub user andrewor14 opened a pull request:

    https://github.com/apache/spark/pull/10717

    [SPARK-10620][WIP] Migrate TaskMetrics to accumulators

    There exist two mechanisms to pass metrics from executors to drivers: 
accumulators and `TaskMetrics`. Currently we send both things to the driver 
using two separate code paths. This is an unnecessary maintenance burden and 
makes the code more difficult to follow.
    
    This patch proposes that we send only accumulators to the driver. 
Additionally, it reimplements `TaskMetrics` using accumulators such that the 
new `TaskMetrics` serves mainly as a syntactic sugar to increment and access 
the values of the underlying accumulators. It migrates the rest of the metrics 
to adopt the code path already used by the existing `PEAK_EXECUTION_MEMORY`.
    
    While an effort has been made to preserve as much of the public API as 
possible, there were a few known breaking `@DeveloperApi` changes that would be 
very awkward to maintain. These are:
    - `TaskMetrics#hostname` field was removed; there were no consumers except 
the event log
    - `ExceptionFailure#taskMetrics` field was replaced with `accumUpdates`
    - `SparkListenerExecutorMetricsUpdate#taskMetrics` field was replaced with 
`accumUpdates`
    
    This is WIP because I would like to add tests for some of the intricate 
cases that I ran into while implementing this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andrewor14/spark task-metrics-to-accums

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10717
    
----
commit 8d5765751c500de5fb699100d5b10eebbcab4e5e
Author: Andrew Or <[email protected]>
Date:   2015-12-30T02:27:27Z

    Remove unused methods or replace them
    
    There are a bunch of decrement X methods that were not used.
    Also, there are a few set X methods that could have easily just
    been increment X. The latter change is more in line with
    accumulators.

commit 1ad286813b758fda992c3d85406a7a628c39d1f7
Author: Andrew Or <[email protected]>
Date:   2016-01-04T21:34:42Z

    Implement initial framework to migrate metrics to accums
    
    This commit uses the existing PEAK_EXECUTION_MEMORY mechanism
    to bring a few other fields in TaskMetrics to use accumulators.

commit a4ca6b2de82dac5f255cd40ace83c77b508ac43d
Author: Andrew Or <[email protected]>
Date:   2016-01-04T21:54:05Z

    Migrate a few more easy metrics

commit 373898e7bf7f887fc008638625072ed1b68d8359
Author: Andrew Or <[email protected]>
Date:   2016-01-04T23:39:37Z

    ShuffleReadMetrics + namespacing accumulators
    
    This commit ports ShuffleReadMetrics to use accumulators,
    preserving as much of the existing semantics as possible.
    It also introduces a nicer way to organize all the internal
    accumulators by namespacing them.

commit e74632c3e0ed6b20f52f112a0cfdad27e036a827
Author: Andrew Or <[email protected]>
Date:   2016-01-05T21:01:08Z

    General code cleanup

commit 7e74bf38ac13e6eb148b15dd95e4241de1f7c118
Author: Andrew Or <[email protected]>
Date:   2016-01-05T21:47:24Z

    ShuffleWriteMetrics

commit 396088d1fff77dd17d280ba3297c4b18d142b99f
Author: Andrew Or <[email protected]>
Date:   2016-01-05T22:17:51Z

    OutputMetrics

commit 0404e3e8f291f6cb8b007fb09d252ee2bb513308
Author: Andrew Or <[email protected]>
Date:   2016-01-05T23:52:31Z

    InputMetrics
    
    This commit was a little tricky because it ripped the bytes read
    callback from TaskMetrics and related classes. It does change
    behavior in the sense that now we periodically update the number
    of bytes read (every 1000 records) instead of doing it every time
    we send an executor heartbeat. The advantage here is code
    simplicity.

commit 17becb05aa347586fe07fd452e288479c7649e82
Author: Andrew Or <[email protected]>
Date:   2016-01-06T00:06:31Z

    Fix JsonProtocol + JsonProtocolSuite

commit 809a93ac3b4961e35bfb180fa1e46bc36296153c
Author: Andrew Or <[email protected]>
Date:   2016-01-06T00:25:22Z

    Fix tests where TaskMetrics had no accumulators
    
    Tests are still failing as of this commit. E.g. SortShuffleSuite.

commit 32ba9e3b78f679261e36b4ea095733a14b30fd7b
Author: Andrew Or <[email protected]>
Date:   2016-01-06T00:29:18Z

    Rename a few shuffle write metrics for consistency

commit 78fb33e3b7714c3ffff932434000780d17835c24
Author: Andrew Or <[email protected]>
Date:   2016-01-06T02:06:10Z

    Fix metrics in local mode (tests)
    
    Tests were previously failing because we end up double counting
    metrics in local mode. This is because each TaskContext shares
    the same list of accumulators, so they end up updating the metrics
    on top of each other. The fix is to ensure TaskContext clears any
    existing values on the accumulators before passing them on.

commit 8117898d585d62e536c7c87f56bc545118cc97b1
Author: Andrew Or <[email protected]>
Date:   2016-01-06T02:42:25Z

    Fix "harmless" exception in peak memory tests
    
    The exception was harmless because it didn't actually fail the
    test. However, the test harness was actually badly written. We
    used to always assume that the first job will have an ID of 0,
    but there could very well be other tests sharing the same
    SparkContext. This is now fixed and we no longer see the exception.
    
    As of this commit, all known test failures have been fixed.
    I'm sure there will be more...

commit 6bd9c0a1caaac38be75f4c8269225a53c75d6509
Author: Andrew Or <[email protected]>
Date:   2016-01-06T19:13:07Z

    Simplify internal accumulator update mechanism
    
    Instead of passing in a callback, we can just return the
    accumulator values directly, which we have. "We" here refers to
    TaskMetrics.

commit ed293282118c414e6d5c405a9c2222616ce9d3bd
Author: Andrew Or <[email protected]>
Date:   2016-01-06T21:43:24Z

    Fix tests

commit 20119126a26420b30c4bba417d2c04aaffac305a
Author: Andrew Or <[email protected]>
Date:   2016-01-06T23:19:56Z

    Clean up
    
    This commit addresses outstanding TODO's and makes the deprecated
    APIs DeveloperApi instead. This allows us to deal with how to
    do the deprecation properly later. This commit also reverts a few
    unnecessary changes to reduce the size of the diff.

commit c3de4f0c8e5dcfbfe4dbddad293c4dc2a8fddaf4
Author: Andrew Or <[email protected]>
Date:   2016-01-06T23:33:40Z

    Merge branch 'master' of github.com:apache/spark into task-metrics-to-accums
    
    Conflicts:
        core/src/main/scala/org/apache/spark/TaskContextImpl.scala
        
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala

commit 9222f11d2b838b3bd61d62396719e44cb728bf66
Author: Andrew Or <[email protected]>
Date:   2016-01-07T03:55:50Z

    Do not send TaskMetrics from executors to driver
    
    Instead, send only accumulator updates. As of this commit
    TaskMetrics is only used as a syntactic sugar on the executor
    side to modify accumulator values by names. Now we no longer
    send the same thing in two different codepaths.
    
    Now that we never send TaskMetrics from executors to the driver,
    we also never send accumulators that way. Then we can revert some
    of the accumulator changes.

commit 2add53f8f83eeec9dd9338a18c32121ddded8afc
Author: Andrew Or <[email protected]>
Date:   2016-01-07T04:08:14Z

    Restore accumulator serialization semantics
    
    In the previous commit, we made accumulator communication one-way
    again, which is the same as before this patch, so we restored all
    the semantics involved in serializing accumulators as before.
    
    Note: tests are still failing because of a duplicate accumulator
    name in some SQL things. Run `DataFrameCallbackSuite` for more
    detail.

commit afe957c5f28a1c24cd49194e60fb309e301aa427
Author: Andrew Or <[email protected]>
Date:   2016-01-07T21:06:12Z

    Fix semantics of accumulators when tasks fail
    
    Currently we still get values for tasks that fail. We should
    keep this semantics in the new accumulator updates as well.

commit c7240f3dd4490e81905131841596eee72be72898
Author: Andrew Or <[email protected]>
Date:   2016-01-07T21:38:55Z

    Fix a few more tests
    
    There are a few places where we passed in empty internal
    accumulators to TaskContextImpl, so the TaskMetrics creation
    would fail. These are now fixed.

commit fa086c36647520f6e04622003a8e795bbffd98ad
Author: Andrew Or <[email protected]>
Date:   2016-01-08T01:24:35Z

    Fix SQL UI
    
    Before this commit the SQL UI would not display any accumulators.
    This is because it is powered by the SQLListener, which reads
    accumulators from TaskMetrics. However, we did not update the
    accumulator values before posting the TaskMetrics, so the UI
    never saw the updates from the tasks.
    
    This commit also fixes a few related test failures.

commit 361442e30ae5938a23e93a276feae1112ce816a0
Author: Andrew Or <[email protected]>
Date:   2016-01-08T02:52:28Z

    Clean up: lift odd unique name requirement
    
    Now internal accumulators no longer need to have unique names.
    This was an unnecessary hack for the SQL accumulators that can
    be reverted through some clean ups.

commit 5aa6aa1bb60d4dead372b307efb1b3b725b4e409
Author: Andrew Or <[email protected]>
Date:   2016-01-08T18:59:59Z

    Move smaller metrics classes to their own files
    
    for readability.

commit 7118be5c16e98bcae7b5effa1ef462dc8f780655
Author: Andrew Or <[email protected]>
Date:   2016-01-08T19:11:07Z

    Fix SQLQuerySuite
    
    A few bugs:
    
    (1) In Executor.scala, we updated TaskMetrics after collecting the
    accumulator values. We should do it the other order.
    
    (2) The test utility method of verifying whether peak execution
    memory is set imposed this requirement on every single job run
    in the test body. This does not apply for SQL's external sort,
    however, because one of the jobs does a sample and so does not
    update peak execution memory.
    
    (3) We were getting accumulators from executors that were not
    registered on the driver. Not exactly sure what the cause is but
    it could very well have to do with GC on the driver since we use
    weak references there. We shouldn't crash the scheduler if this
    happens.

commit 176e91d0abfaed3d8741821787a3732b0b9f3688
Author: Andrew Or <[email protected]>
Date:   2016-01-08T21:22:21Z

    Remove unused hostname from TaskMetrics

commit 7939e1c74d9dd62e383c71c72d9b353c3f6d8dfc
Author: Andrew Or <[email protected]>
Date:   2016-01-08T22:18:17Z

    Reinitialize OutputMetrics et al during reconstruction
    
    Such that downstream listeners can access their values.
    This commit also generalizes the internal accumulator type from
    Long to anything, since we need to store the read and write
    methods of InputMetrics and OutputMetrics respectively.

commit c029f62be0628af904fe1a1ad795e8208f55e51d
Author: Andrew Or <[email protected]>
Date:   2016-01-08T23:52:02Z

    Fix *ShuffleSuite
    
    This fixes a bug where when we reconstruct TaskMetrics we just
    pass in mutable accumulators, such that when new tasks come in
    they change the values of the old tasks. A more subtle bug here
    is that we were passing in the accumulated values instead of the
    local task values. Both are now fixed.
    
    TODO: write a test for all of these please.

commit b3c51dd3930f8c304f0fc67fc476d95d066f9a1e
Author: Andrew Or <[email protected]>
Date:   2016-01-09T01:45:48Z

    Fix DAGSchedulerSuite
    
    The fake accumulator values should no longer all be Longs. Ugh.

commit d531f3fff759dbfca6f24eb41ec37428ea055785
Author: Andrew Or <[email protected]>
Date:   2016-01-09T06:51:17Z

    Simplify accumulator update code a little

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to