git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 2aca97c7c - 3cca19622


[SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

The current way of seed distribution makes the random sequences from partition 
i and i+1 offset by 1.

~~~
In [14]: import random

In [15]: r1 = random.Random(10)

In [16]: r1.randint(0, 1)
Out[16]: 1

In [17]: r1.random()
Out[17]: 0.4288890546751146

In [18]: r1.random()
Out[18]: 0.5780913011344704

In [19]: r2 = random.Random(10)

In [20]: r2.randint(0, 1)
Out[20]: 1

In [21]: r2.randint(0, 1)
Out[21]: 0

In [22]: r2.random()
Out[22]: 0.5780913011344704
~~~

Note: The new tests are not for this bug fix.

Author: Xiangrui Meng m...@databricks.com

Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:

869ae4b [Xiangrui Meng] move tests tests.py
c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cca1962
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cca1962
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cca1962

Branch: refs/heads/master
Commit: 3cca1962207745814b9d83e791713c91b659c36c
Parents: 2aca97c
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 12:24:24 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 12:24:24 2014 -0800

--
 python/pyspark/rdd.py|  3 ---
 python/pyspark/rddsampler.py | 11 +--
 python/pyspark/tests.py  | 15 +++
 3 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 550c9dd..4f025b9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -316,9 +316,6 @@ class RDD(object):
 
 Return a sampled subset of this RDD (relies on numpy and falls back
 on default random generator if numpy is unavailable).
-
- sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() 
#doctest: +SKIP
-[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
 
 assert fraction = 0.0, Negative fraction value: %s % fraction
 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, 
fraction, seed).func, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rddsampler.py
--
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 528a181..f5c3cfd 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -40,14 +40,13 @@ class RDDSamplerBase(object):
 def initRandomGenerator(self, split):
 if self._use_numpy:
 import numpy
-self._random = numpy.random.RandomState(self._seed)
+self._random = numpy.random.RandomState(self._seed ^ split)
 else:
-self._random = random.Random(self._seed)
+self._random = random.Random(self._seed ^ split)
 
-for _ in range(0, split):
-# discard the next few values in the sequence to have a
-# different seed for the different splits
-self._random.randint(0, 2 ** 32 - 1)
+# mixing because the initial seeds are close to each other
+for _ in xrange(10):
+self._random.randint(0, 1)
 
 self._split = split
 self._rand_initialized = True

http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 37a1289..253a471 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase):
 self.assertEquals(result.getNumPartitions(), 5)
 self.assertEquals(result.count(), 3)
 
+def test_sample(self):
+rdd = self.sc.parallelize(range(0, 100), 4)
+wo = rdd.sample(False, 0.1, 2).collect()
+wo_dup = rdd.sample(False, 0.1, 2).collect()
+self.assertSetEqual(set(wo), set(wo_dup))
+wr = rdd.sample(True, 0.2, 5).collect()
+wr_dup = rdd.sample(True, 0.2, 5).collect()
+self.assertSetEqual(set(wr), set(wr_dup))
+wo_s10 = rdd.sample(False, 0.3, 10).collect()
+wo_s20 = rdd.sample(False, 0.3, 20).collect()
+self.assertNotEqual(set(wo_s10), set(wo_s20))
+wr_s11 = rdd.sample(True, 0.4, 11).collect()
+wr_s21 = rdd.sample(True, 0.4, 21).collect()
+self.assertNotEqual(set(wr_s11), set(wr_s21))
+
 
 class ProfilerTests(PySparkTestCase):
 



git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 76386e1a2 - a68321400


[SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

The current way of seed distribution makes the random sequences from partition 
i and i+1 offset by 1.

~~~
In [14]: import random

In [15]: r1 = random.Random(10)

In [16]: r1.randint(0, 1)
Out[16]: 1

In [17]: r1.random()
Out[17]: 0.4288890546751146

In [18]: r1.random()
Out[18]: 0.5780913011344704

In [19]: r2 = random.Random(10)

In [20]: r2.randint(0, 1)
Out[20]: 1

In [21]: r2.randint(0, 1)
Out[21]: 0

In [22]: r2.random()
Out[22]: 0.5780913011344704
~~~

Note: The new tests are not for this bug fix.

Author: Xiangrui Meng m...@databricks.com

Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:

869ae4b [Xiangrui Meng] move tests tests.py
c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample

(cherry picked from commit 3cca1962207745814b9d83e791713c91b659c36c)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6832140
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6832140
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6832140

Branch: refs/heads/branch-1.2
Commit: a68321400c1068449698d03cebd0fbf648627133
Parents: 76386e1
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 12:24:24 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 12:24:47 2014 -0800

--
 python/pyspark/rdd.py|  3 ---
 python/pyspark/rddsampler.py | 11 +--
 python/pyspark/tests.py  | 15 +++
 3 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 550c9dd..4f025b9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -316,9 +316,6 @@ class RDD(object):
 
 Return a sampled subset of this RDD (relies on numpy and falls back
 on default random generator if numpy is unavailable).
-
- sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() 
#doctest: +SKIP
-[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
 
 assert fraction = 0.0, Negative fraction value: %s % fraction
 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, 
fraction, seed).func, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rddsampler.py
--
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 528a181..f5c3cfd 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -40,14 +40,13 @@ class RDDSamplerBase(object):
 def initRandomGenerator(self, split):
 if self._use_numpy:
 import numpy
-self._random = numpy.random.RandomState(self._seed)
+self._random = numpy.random.RandomState(self._seed ^ split)
 else:
-self._random = random.Random(self._seed)
+self._random = random.Random(self._seed ^ split)
 
-for _ in range(0, split):
-# discard the next few values in the sequence to have a
-# different seed for the different splits
-self._random.randint(0, 2 ** 32 - 1)
+# mixing because the initial seeds are close to each other
+for _ in xrange(10):
+self._random.randint(0, 1)
 
 self._split = split
 self._rand_initialized = True

http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 37a1289..253a471 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase):
 self.assertEquals(result.getNumPartitions(), 5)
 self.assertEquals(result.count(), 3)
 
+def test_sample(self):
+rdd = self.sc.parallelize(range(0, 100), 4)
+wo = rdd.sample(False, 0.1, 2).collect()
+wo_dup = rdd.sample(False, 0.1, 2).collect()
+self.assertSetEqual(set(wo), set(wo_dup))
+wr = rdd.sample(True, 0.2, 5).collect()
+wr_dup = rdd.sample(True, 0.2, 5).collect()
+self.assertSetEqual(set(wr), set(wr_dup))
+wo_s10 = rdd.sample(False, 0.3, 10).collect()
+wo_s20 = rdd.sample(False, 0.3, 20).collect()
+self.assertNotEqual(set(wo_s10), set(wo_s20))
+wr_s11 = rdd.sample(True, 0.4, 11).collect()
+wr_s21 = rdd.sample(True, 0.4,