GitHub user njhwang opened a pull request:
https://github.com/apache/spark/pull/7378
Having pyspark's RDD.aggregate() make a deepcopy of zeroValue for each
partition
I'm relatively new to Spark and functional programming, so forgive me if
this pull request is just a result of my misunderstanding of how Spark should
be used.
Currently, if one happens to use a mutable object as `zeroValue` for
`RDD.aggregate()`, possibly unexpected behavior can occur.
This is because pyspark's current implementation of `RDD.aggregate()` does
not serialize or make a copy of `zeroValue` before handing it off to
`RDD.mapPartitions(...).fold(...)`. This results in a single reference to
`zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each
partition. This can result in strange accumulator values being fed into each
partition's call to `RDD.fold()`, as the `zeroValue` may have been changed
in-place during the `RDD.mapPartitions()` call.
As an illustrative example, submit the following to `spark-submit`:
```
from pyspark import SparkConf, SparkContext
import collections
def updateCounter(acc, val):
print 'update acc:', acc
print 'update val:', val
acc[val] += 1
return acc
def comboCounter(acc1, acc2):
print 'combo acc1:', acc1
print 'combo acc2:', acc2
acc1.update(acc2)
return acc1
def main():
conf = SparkConf().setMaster("local").setAppName("Aggregate with
Counter")
sc = SparkContext(conf = conf)
print '======= AGGREGATING with ONE PARTITION ======='
print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(),
updateCounter, comboCounter)
print '======= AGGREGATING with TWO PARTITIONS ======='
print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(),
updateCounter, comboCounter)
if __name__ == "__main__":
main()
```
One probably expects this to output the following:
```
Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})
```
But it instead outputs this (regardless of the number of partitions):
```
Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2})
```
This is because (I believe) `zeroValue` gets passed correctly to each
partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object
has been updated and is then passed to `RDD.fold()`, which results in all items
being double-counted within each partition before being finally reduced at the
calling node.
I realize that this type of calculation is typically done by
`RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some
potentially confusing behavior. I also noticed that other `RDD` methods use
this `deepcopy` approach to creating unique copies of `zeroValue` (i.e.,
`RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala
implementations do seem to serialize the `zeroValue` object appropriately to
prevent this type of behavior.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/njhwang/spark master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/7378.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #7378
----
commit 90d15441b689c05086134a435b3168c9bbabf0b9
Author: Nicholas Hwang <[email protected]>
Date: 2015-07-13T21:08:31Z
Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions;
this ensures that the mapPartitions call works with unique copies of zeroValue
in each partition, and prevents a single reference to zeroValue being used for
both map and fold calls on each partition (resulting in possibly unexpected
behavior).
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]