GitHub user holdenk opened a pull request:
https://github.com/apache/spark/pull/10841
[SPARK-12469][CORE][RFC/WIP] Add Consistent Accumulators for Spark
This is an initial PR illustrating one of the possible approaches for
providing consistent accumulators in Spark.
Tasks executed on Spark workers are unable to modify values from the
driver, and accumulators are the one exception for this. Accumulators in Spark
are implemented in such a way that when a stage is recomputed (say for cache
eviction) the accumulator will be updated a second time. This makes
accumulators inside of transformations more difficult to use for things like
counting invalid records (one of the primary potential use cases of collecting
side information during a transformation). However in some cases this counting
during re-evaluation is exactly the behaviour we want (say in tracking total
execution time for a particular function). Spark would benefit from a version
of accumulators which did not double count even if stages were re-executed.
Motivating example:
```
val parseTime = sc.accumulator(0L)
val parseFailures = sc.accumulator(0L)
val parsedData = sc.textFile(...).flatMap { line =>
val start = System.currentTimeMillis()
val parsed = Try(parse(line))
if (parsed.isFailure) parseFailures += 1
parseTime += System.currentTimeMillis() - start
parsed.toOption
}
parsedData.cache()
val resultA = parsedData.map(...).filter(...).count()
// some intervening code. Almost anything could happen here -- some of
parsedData may
// get kicked out of the cache, or an executor where data was cached might
get lost
val resultB = parsedData.filter(...).map(...).flatMap(...).count()
// now we look at the accumulators
```
Here we would want parseFailures to only have been added to once for every
line which failed to parse. Unfortunately, the current Spark accumulator API
doesnât support the current parseFailures use case since if some data had
been evicted its possible that it will be double counted.
See the full design document at
https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing
cc @squito
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/holdenk/spark
SPARK-12469-consistent-accumulators-for-spark
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/10841.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #10841
----
commit 7c9f874d94085f60089cda8ab3b6c451bf793a1a
Author: Holden Karau <[email protected]>
Date: 2015-12-28T19:59:30Z
Start adding the proposed external API for consistent accumulators
commit fce92981371ddd7af8ad28f30537fb473bbee378
Author: Holden Karau <[email protected]>
Date: 2015-12-28T19:59:47Z
Start keeping track of the required information in the taskcontext
commit c0e2bfd9d3d5f22e9ea217a500b67e84679f519d
Author: Holden Karau <[email protected]>
Date: 2015-12-30T00:29:53Z
A bit more progress towards consistent accumulators, add an add function
which checks the bitset, throw an exception on merges (perhaps this isn't
reasonable and then we need to keep all of the previous values - but a quick
skim looks like its intended to be a user accesiable function so just don't
support).
commit e9c287f682cbefc531a8fc84b00a472f860c9d06
Author: Holden Karau <[email protected]>
Date: 2015-12-30T01:51:53Z
Waaait that was not thinking clearly. As we go with a consistent
accumulator adding values add them to a hashmap of pending values to be merged
based on the rdd id & partition id - then driver side when merging the
accumulators check each rdd & partition id individually against the rdd &
bitset of processed partitions for that rdd adding to a single accumulator.
(This way we don't need to keep all of the values around forever but if we say
have first() and then foreach() we can add the value for partition one then
when the second task happens and we can skip just the adding partion one. Up
next: start adding some simple tests for this to see if it works
commit cf32dc5d543d4c0bb883e00692d21f22a7470737
Author: Holden Karau <[email protected]>
Date: 2016-01-06T07:45:18Z
Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark
commit d9354b12977bdaa2bcd82730fb34bdae490ab02d
Author: Holden Karau <[email protected]>
Date: 2016-01-06T20:26:13Z
add consistentValue (temporary) to get the consistent value and fix the
accumulator to work even when no merge happens
commit 28a5754b654441ad8904f6a8f6e4a06db3908541
Author: Holden Karau <[email protected]>
Date: 2016-01-06T20:58:07Z
Introduce a GenericAccumulable to allow for Accumulables which don't return
the same value as the value accumulate (e.g. consistent accumulators accumulate
a lot of book keeping information which the user shouldn't know about so hide
that). Accumulable now extends GenericAccumulable to keep the API the same
commit 21387e1c5ebf10d6a2b26996ba0a54640925a903
Author: Holden Karau <[email protected]>
Date: 2016-01-07T07:37:31Z
Temporary debugging
commit 11ac8364c4077ee14c2284f6d3a17341efb79b61
Author: Holden Karau <[email protected]>
Date: 2016-01-07T07:38:36Z
Add a some tests for consistent accumulators
commit b76218d1645b58be587e9db1337b2867b20dcaf1
Author: Holden Karau <[email protected]>
Date: 2016-01-07T09:11:38Z
Switch to doing explicit passin in the API for now (can figure out the
magic later if we want magic). Next up: handle partially consumed partitions.
commit 7fb981c9f1503747b687f31d37a11b22c87fcf66
Author: Holden Karau <[email protected]>
Date: 2016-01-07T19:22:39Z
Get rid of some old junk added when trying the old approach
commit c75b037cdffd05c415e8e3dbe247f921009e31d6
Author: Holden Karau <[email protected]>
Date: 2016-01-07T21:39:01Z
Have tasks report if they've processed the entire partition and skip
incrementing the consistent accumulator if this is not the case
commit df89265241891380810860ad0f26e653efed298d
Author: Holden Karau <[email protected]>
Date: 2016-01-07T22:54:56Z
Regardless of how much of the iterator we read in the task, if we are using
persistance than the entire partition gets computed
commit 33b45411c4f7ac45c439e020d82db95f8333e0bd
Author: Holden Karau <[email protected]>
Date: 2016-01-07T23:20:34Z
Some improvements
commit b2d0926eac596b0f075db7809df1130bd7025d00
Author: Holden Karau <[email protected]>
Date: 2016-01-11T23:07:13Z
Merge in master
commit 4fd97a0a85f4e2c3a0e9b7edf2deb0a957443c4d
Author: Holden Karau <[email protected]>
Date: 2016-01-12T18:53:01Z
Style fixes - todo check some indentation and such
commit f12608776d5359636065b2e9dd7c3c3d1eed930a
Author: Holden Karau <[email protected]>
Date: 2016-01-12T19:30:59Z
Fix some indentation
commit 11da3d30d148e474595e95465c1dd2d81ca1d43f
Author: Holden Karau <[email protected]>
Date: 2016-01-14T00:07:56Z
Fix whitespace, add an explicit test for a single partiton, remove setting
the when adding an accumulator value since we always have a merge with the zero
val
commit b7637c94f0fd694c61ccba2e8876352e472c3795
Author: Holden Karau <[email protected]>
Date: 2016-01-14T00:11:14Z
Remove unecessary printlns
commit 82e8cb33ca1aef621d11083a30c16baf881ec90e
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:27:55Z
Merge branch 'master' into SPARK-12469-consistent-accumulators-for-spark
commit 6304270f42bc9ad5e1aea6021b122d4d85d47c38
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:38:03Z
Fix indentation
commit 6557e5db63a59af527d0f90447d3a40309ef9d87
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:38:40Z
Fix documentation for consistent accumulators example, and be consistent
about using splitID
commit 922d12c17cb213db596b60736f975f3a52d71196
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:39:07Z
Clarify when includeConsistent should be true/false in the javadoc of
TaskContext
commit 2c7cba021cec3ea0f1f708e4bb0f4291648d12ac
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:45:55Z
Remove some unused imports
commit a842ed23b8994aa1a109f32a742e5f98c0661599
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:46:29Z
rename consistentaccumulators.scala to consistentaccumulatorssuite.scala
commit 1a7cce9d339bb0debf456e012f07f8b6a2dc5b69
Author: Holden Karau <[email protected]>
Date: 2016-01-17T20:52:41Z
Fix indentation on multiline params
commit aaa9b89fabed40fec33c33243166eeb8a4b3feed
Author: Holden Karau <[email protected]>
Date: 2016-01-19T23:09:37Z
Merge in master - first pass
commit 211b3e7a7c9d1e08df919c12fb8e7e6af76eb640
Author: Holden Karau <[email protected]>
Date: 2016-01-19T23:27:18Z
Fix merge and add flatMapWithAccumulator
commit 3f96429b5c6a6c5596d70a9d15de74ae6203c970
Author: Holden Karau <[email protected]>
Date: 2016-01-19T23:33:36Z
Remove extra line
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]