git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample
Repository: spark Updated Branches: refs/heads/master 2aca97c7c - 3cca19622 [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1. ~~~ In [14]: import random In [15]: r1 = random.Random(10) In [16]: r1.randint(0, 1) Out[16]: 1 In [17]: r1.random() Out[17]: 0.4288890546751146 In [18]: r1.random() Out[18]: 0.5780913011344704 In [19]: r2 = random.Random(10) In [20]: r2.randint(0, 1) Out[20]: 1 In [21]: r2.randint(0, 1) Out[21]: 0 In [22]: r2.random() Out[22]: 0.5780913011344704 ~~~ Note: The new tests are not for this bug fix. Author: Xiangrui Meng m...@databricks.com Closes #3010 from mengxr/SPARK-4148 and squashes the following commits: 869ae4b [Xiangrui Meng] move tests tests.py c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cca1962 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cca1962 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cca1962 Branch: refs/heads/master Commit: 3cca1962207745814b9d83e791713c91b659c36c Parents: 2aca97c Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 12:24:24 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 12:24:24 2014 -0800 -- python/pyspark/rdd.py| 3 --- python/pyspark/rddsampler.py | 11 +-- python/pyspark/tests.py | 15 +++ 3 files changed, 20 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 550c9dd..4f025b9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,9 +316,6 @@ class RDD(object): Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). - - sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP -[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] assert fraction = 0.0, Negative fraction value: %s % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rddsampler.py -- diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 528a181..f5c3cfd 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -40,14 +40,13 @@ class RDDSamplerBase(object): def initRandomGenerator(self, split): if self._use_numpy: import numpy -self._random = numpy.random.RandomState(self._seed) +self._random = numpy.random.RandomState(self._seed ^ split) else: -self._random = random.Random(self._seed) +self._random = random.Random(self._seed ^ split) -for _ in range(0, split): -# discard the next few values in the sequence to have a -# different seed for the different splits -self._random.randint(0, 2 ** 32 - 1) +# mixing because the initial seeds are close to each other +for _ in xrange(10): +self._random.randint(0, 1) self._split = split self._rand_initialized = True http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 37a1289..253a471 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase): self.assertEquals(result.getNumPartitions(), 5) self.assertEquals(result.count(), 3) +def test_sample(self): +rdd = self.sc.parallelize(range(0, 100), 4) +wo = rdd.sample(False, 0.1, 2).collect() +wo_dup = rdd.sample(False, 0.1, 2).collect() +self.assertSetEqual(set(wo), set(wo_dup)) +wr = rdd.sample(True, 0.2, 5).collect() +wr_dup = rdd.sample(True, 0.2, 5).collect() +self.assertSetEqual(set(wr), set(wr_dup)) +wo_s10 = rdd.sample(False, 0.3, 10).collect() +wo_s20 = rdd.sample(False, 0.3, 20).collect() +self.assertNotEqual(set(wo_s10), set(wo_s20)) +wr_s11 = rdd.sample(True, 0.4, 11).collect() +wr_s21 = rdd.sample(True, 0.4, 21).collect() +self.assertNotEqual(set(wr_s11), set(wr_s21)) + class ProfilerTests(PySparkTestCase):
git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample
Repository: spark Updated Branches: refs/heads/branch-1.2 76386e1a2 - a68321400 [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1. ~~~ In [14]: import random In [15]: r1 = random.Random(10) In [16]: r1.randint(0, 1) Out[16]: 1 In [17]: r1.random() Out[17]: 0.4288890546751146 In [18]: r1.random() Out[18]: 0.5780913011344704 In [19]: r2 = random.Random(10) In [20]: r2.randint(0, 1) Out[20]: 1 In [21]: r2.randint(0, 1) Out[21]: 0 In [22]: r2.random() Out[22]: 0.5780913011344704 ~~~ Note: The new tests are not for this bug fix. Author: Xiangrui Meng m...@databricks.com Closes #3010 from mengxr/SPARK-4148 and squashes the following commits: 869ae4b [Xiangrui Meng] move tests tests.py c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample (cherry picked from commit 3cca1962207745814b9d83e791713c91b659c36c) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6832140 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6832140 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6832140 Branch: refs/heads/branch-1.2 Commit: a68321400c1068449698d03cebd0fbf648627133 Parents: 76386e1 Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 12:24:24 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 12:24:47 2014 -0800 -- python/pyspark/rdd.py| 3 --- python/pyspark/rddsampler.py | 11 +-- python/pyspark/tests.py | 15 +++ 3 files changed, 20 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 550c9dd..4f025b9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,9 +316,6 @@ class RDD(object): Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). - - sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP -[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] assert fraction = 0.0, Negative fraction value: %s % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rddsampler.py -- diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 528a181..f5c3cfd 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -40,14 +40,13 @@ class RDDSamplerBase(object): def initRandomGenerator(self, split): if self._use_numpy: import numpy -self._random = numpy.random.RandomState(self._seed) +self._random = numpy.random.RandomState(self._seed ^ split) else: -self._random = random.Random(self._seed) +self._random = random.Random(self._seed ^ split) -for _ in range(0, split): -# discard the next few values in the sequence to have a -# different seed for the different splits -self._random.randint(0, 2 ** 32 - 1) +# mixing because the initial seeds are close to each other +for _ in xrange(10): +self._random.randint(0, 1) self._split = split self._rand_initialized = True http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 37a1289..253a471 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase): self.assertEquals(result.getNumPartitions(), 5) self.assertEquals(result.count(), 3) +def test_sample(self): +rdd = self.sc.parallelize(range(0, 100), 4) +wo = rdd.sample(False, 0.1, 2).collect() +wo_dup = rdd.sample(False, 0.1, 2).collect() +self.assertSetEqual(set(wo), set(wo_dup)) +wr = rdd.sample(True, 0.2, 5).collect() +wr_dup = rdd.sample(True, 0.2, 5).collect() +self.assertSetEqual(set(wr), set(wr_dup)) +wo_s10 = rdd.sample(False, 0.3, 10).collect() +wo_s20 = rdd.sample(False, 0.3, 20).collect() +self.assertNotEqual(set(wo_s10), set(wo_s20)) +wr_s11 = rdd.sample(True, 0.4, 11).collect() +wr_s21 = rdd.sample(True, 0.4,