Repository: spark Updated Branches: refs/heads/master 5aa2710c1 -> 8d4940092
[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API. Author: jerryshao <ss...@hortonworks.com> Closes #10350 from jerryshao/SPARK-12353. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d494009 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d494009 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d494009 Branch: refs/heads/master Commit: 8d4940092141c0909b673607f393cdd53f093ed6 Parents: 5aa2710 Author: jerryshao <ss...@hortonworks.com> Authored: Mon Dec 28 10:43:23 2015 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Dec 28 10:43:23 2015 +0000 ---------------------------------------------------------------------- python/pyspark/streaming/dstream.py | 4 ++-- python/pyspark/streaming/tests.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8d494009/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index adc2651..86447f5 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -247,7 +247,7 @@ class DStream(object): Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """ - return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count() + return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) def saveAsTextFiles(self, prefix, suffix=None): """ @@ -493,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda kv: kv[1] > 0).count() + return counted.filter(lambda kv: kv[1] > 0) def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ http://git-wip-us.apache.org/repos/asf/spark/blob/8d494009/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4949cd6..86b05d9 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -279,8 +279,10 @@ class BasicOperationTests(PySparkStreamingTestCase): def func(dstream): return dstream.countByValue() - expected = [[4], [4], [3]] - self._test_func(input, func, expected) + expected = [[(1, 2), (2, 2), (3, 2), (4, 2)], + [(5, 2), (6, 2), (7, 1), (8, 1)], + [("a", 2), ("b", 1), ("", 1)]] + self._test_func(input, func, expected, sort=True) def test_groupByKey(self): """Basic operation test for DStream.groupByKey.""" @@ -651,7 +653,16 @@ class WindowFunctionTests(PySparkStreamingTestCase): def func(dstream): return dstream.countByValueAndWindow(2.5, .5) - expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]] + expected = [[(0, 1)], + [(0, 2), (1, 1)], + [(0, 3), (1, 2), (2, 1)], + [(0, 4), (1, 3), (2, 2), (3, 1)], + [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)], + [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)], + [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)], + [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)], + [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)], + [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]] self._test_func(input, func, expected) def test_group_by_key_and_window(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org