SPARK-1168, Added foldByKey to pyspark. Author: Prashant Sharma <prashan...@imaginea.com>
Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits: db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44805058 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44805058 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44805058 Branch: refs/heads/branch-0.9 Commit: 44805058fc054f5d2f05a763bb3735c5b1afefb8 Parents: e74e79a Author: Prashant Sharma <prashan...@imaginea.com> Authored: Mon Mar 10 13:37:11 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Sun Mar 16 22:13:33 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/44805058/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 678b005..487bfb1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -910,7 +910,21 @@ class RDD(object): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) + + def foldByKey(self, zeroValue, func, numPartitions=None): + """ + Merge the values for each key using an associative function "func" and a neutral "zeroValue" + which may be added to the result an arbitrary number of times, and must not change + the result (e.g., 0 for addition, or 1 for multiplication.). + >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) + >>> from operator import add + >>> rdd.foldByKey(0, add).collect() + [('a', 2), ('b', 1)] + """ + return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) + + # TODO: support variant with custom partitioner def groupByKey(self, numPartitions=None): """