GitHub user njhwang opened a pull request:

    https://github.com/apache/spark/pull/7378

    Having pyspark's RDD.aggregate() make a deepcopy of zeroValue for each 
partition

    I'm relatively new to Spark and functional programming, so forgive me if 
this pull request is just a result of my misunderstanding of how Spark should 
be used.
    
    Currently, if one happens to use a mutable object as `zeroValue` for 
`RDD.aggregate()`, possibly unexpected behavior can occur.
    
    This is because pyspark's current implementation of `RDD.aggregate()` does 
not serialize or make a copy of `zeroValue` before handing it off to 
`RDD.mapPartitions(...).fold(...)`. This results in a single reference to 
`zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each 
partition. This can result in strange accumulator values being fed into each 
partition's call to `RDD.fold()`, as the `zeroValue` may have been changed 
in-place during the `RDD.mapPartitions()` call.
    
    As an illustrative example, submit the following to `spark-submit`:
    ```
    from pyspark import SparkConf, SparkContext
    import collections
     
    def updateCounter(acc, val):
        print 'update acc:', acc
        print 'update val:', val
        acc[val] += 1
        return acc
     
    def comboCounter(acc1, acc2):
        print 'combo acc1:', acc1
        print 'combo acc2:', acc2
        acc1.update(acc2)
        return acc1
    
    def main():
        conf = SparkConf().setMaster("local").setAppName("Aggregate with 
Counter")
        sc = SparkContext(conf = conf)
    
        print '======= AGGREGATING with ONE PARTITION ======='
        print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), 
updateCounter, comboCounter)
    
        print '======= AGGREGATING with TWO PARTITIONS ======='
        print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), 
updateCounter, comboCounter)
    
    if __name__ == "__main__":
        main()
    ```
    
    One probably expects this to output the following:
    ```
    Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})
    ```
    
    But it instead outputs this (regardless of the number of partitions):
    ```
    Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2})
    ```
    
    This is because (I believe) `zeroValue` gets passed correctly to each 
partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object 
has been updated and is then passed to `RDD.fold()`, which results in all items 
being double-counted within each partition before being finally reduced at the 
calling node.
    
    I realize that this type of calculation is typically done by 
`RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some 
potentially confusing behavior. I also noticed that other `RDD` methods use 
this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., 
`RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala 
implementations do seem to serialize the `zeroValue` object appropriately to 
prevent this type of behavior.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/njhwang/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/7378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #7378
    
----
commit 90d15441b689c05086134a435b3168c9bbabf0b9
Author: Nicholas Hwang <[email protected]>
Date:   2015-07-13T21:08:31Z

    Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; 
this ensures that the mapPartitions call works with unique copies of zeroValue 
in each partition, and prevents a single reference to zeroValue being used for 
both map and fold calls on each partition (resulting in possibly unexpected 
behavior).

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to