spark git commit: [SPARK-6919] [PYSPARK] Add asDict method to StatCounter
Repository: spark Updated Branches: refs/heads/master ab41864f9 -> 7d399c9da [SPARK-6919] [PYSPARK] Add asDict method to StatCounter Add method to easily convert a StatCounter instance into a Python dict https://issues.apache.org/jira/browse/SPARK-6919 Note: This is my original work and the existing Spark license applies. Author: Erik ShiltsCloses #5516 from eshilts/statcounter-asdict. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d399c9d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d399c9d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d399c9d Branch: refs/heads/master Commit: 7d399c9daa6769ab234890c551e1b3456e0e6e85 Parents: ab41864 Author: Erik Shilts Authored: Tue Sep 29 13:38:15 2015 -0700 Committer: Davies Liu Committed: Tue Sep 29 13:38:15 2015 -0700 -- python/pyspark/statcounter.py | 22 ++ python/pyspark/tests.py | 20 2 files changed, 42 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d399c9d/python/pyspark/statcounter.py -- diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 0fee3b2..03ea0b6 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -131,6 +131,28 @@ class StatCounter(object): def sampleStdev(self): return sqrt(self.sampleVariance()) +def asDict(self, sample=False): +"""Returns the :class:`StatCounter` members as a ``dict``. + +>>> sc.parallelize([1., 2., 3., 4.]).stats().asDict() +{'count': 4L, + 'max': 4.0, + 'mean': 2.5, + 'min': 1.0, + 'stdev': 1.2909944487358056, + 'sum': 10.0, + 'variance': 1.6667} +""" +return { +'count': self.count(), +'mean': self.mean(), +'sum': self.sum(), +'min': self.min(), +'max': self.max(), +'stdev': self.stdev() if sample else self.sampleStdev(), +'variance': self.variance() if sample else self.sampleVariance() +} + def __repr__(self): return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % (self.count(), self.mean(), self.stdev(), self.max(), self.min())) http://git-wip-us.apache.org/repos/asf/spark/blob/7d399c9d/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f11aaf0..63cc87e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1976,6 +1976,26 @@ class NumPyTests(PySparkTestCase): self.assertSequenceEqual([3.0, 3.0], s.max().tolist()) self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist()) +stats_dict = s.asDict() +self.assertEqual(3, stats_dict['count']) +self.assertSequenceEqual([2.0, 2.0], stats_dict['mean'].tolist()) +self.assertSequenceEqual([1.0, 1.0], stats_dict['min'].tolist()) +self.assertSequenceEqual([3.0, 3.0], stats_dict['max'].tolist()) +self.assertSequenceEqual([6.0, 6.0], stats_dict['sum'].tolist()) +self.assertSequenceEqual([1.0, 1.0], stats_dict['stdev'].tolist()) +self.assertSequenceEqual([1.0, 1.0], stats_dict['variance'].tolist()) + +stats_sample_dict = s.asDict(sample=True) +self.assertEqual(3, stats_dict['count']) +self.assertSequenceEqual([2.0, 2.0], stats_sample_dict['mean'].tolist()) +self.assertSequenceEqual([1.0, 1.0], stats_sample_dict['min'].tolist()) +self.assertSequenceEqual([3.0, 3.0], stats_sample_dict['max'].tolist()) +self.assertSequenceEqual([6.0, 6.0], stats_sample_dict['sum'].tolist()) +self.assertSequenceEqual( +[0.816496580927726, 0.816496580927726], stats_sample_dict['stdev'].tolist()) +self.assertSequenceEqual( +[0., 0.], stats_sample_dict['variance'].tolist()) + if __name__ == "__main__": if not _have_scipy: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10782] [PYTHON] Update dropDuplicates documentation
Repository: spark Updated Branches: refs/heads/master 7d399c9da -> c1ad373f2 [SPARK-10782] [PYTHON] Update dropDuplicates documentation Documentation for dropDuplicates() and drop_duplicates() is one and the same. Resolved the error in the example for drop_duplicates using the same approach used for groupby and groupBy, by indicating that dropDuplicates and drop_duplicates are aliases. Author: asokadiggsCloses #8930 from asokadiggs/jira-10782. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1ad373f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1ad373f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1ad373f Branch: refs/heads/master Commit: c1ad373f26053e1906fce7681c03d130a642bf33 Parents: 7d399c9 Author: asokadiggs Authored: Tue Sep 29 17:45:18 2015 -0400 Committer: Sean Owen Committed: Tue Sep 29 17:45:18 2015 -0400 -- python/pyspark/sql/dataframe.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1ad373f/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b09422a..033b319 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -931,6 +931,8 @@ class DataFrame(object): """Return a new :class:`DataFrame` with duplicate rows removed, optionally only considering certain columns. +:func:`drop_duplicates` is an alias for :func:`dropDuplicates`. + >>> from pyspark.sql import Row >>> df = sc.parallelize([ \ Row(name='Alice', age=5, height=80), \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite
Repository: spark Updated Branches: refs/heads/branch-1.5 9b3014bc4 -> d54493279 [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite Fix the following issues in StandaloneDynamicAllocationSuite: 1. It should not assume master and workers start in order 2. It should not assume master and workers get ready at once 3. It should not assume the application is already registered with master after creating SparkContext 4. It should not access Master.app and idToApp which are not thread safe The changes includes: * Use `eventually` to wait until master and workers are ready to fix 1 and 2 * Use `eventually` to wait until the application is registered with master to fix 3 * Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4 Author: zsxwingCloses #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite. (cherry picked from commit dba95ea03216e6b8e623db4a36e1018c6ed95538) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5449327 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5449327 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5449327 Branch: refs/heads/branch-1.5 Commit: d5449327941ed00bf1aaa07ee02113d1522ad514 Parents: 9b3014b Author: zsxwing Authored: Tue Sep 29 11:53:28 2015 -0700 Committer: Andrew Or Committed: Tue Sep 29 11:53:38 2015 -0700 -- .../StandaloneDynamicAllocationSuite.scala | 305 --- 1 file changed, 192 insertions(+), 113 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5449327/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0..2e2fa22 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,15 @@ package org.apache.spark.deploy +import scala.concurrent.duration._ + import org.mockito.Mockito.{mock, when} import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite } master = makeMaster() workers = makeWorkers(10, 2048) +// Wait until all workers register with master successfully +eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) +} } override def afterAll(): Unit = { @@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation default behavior") { sc = new SparkContext(appConf) val appId = sc.applicationId -assert(master.apps.size === 1) -assert(master.apps.head.id === appId) -assert(master.apps.head.executors.size === 2) -assert(master.apps.head.getExecutorLimit === Int.MaxValue) +eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) +} // kill all executors assert(killAllExecutors(sc)) -assert(master.apps.head.executors.size === 0) -assert(master.apps.head.getExecutorLimit === 0) +var apps = getApplications() +assert(apps.head.executors.size === 0) +assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 1) -assert(master.apps.head.getExecutorLimit === 1) +apps = getApplications() +assert(apps.head.executors.size === 1) +assert(apps.head.getExecutorLimit === 1) // request 1 more assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 2) -assert(master.apps.head.getExecutorLimit === 2) +apps = getApplications() +assert(apps.head.executors.size === 2) +assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 2) -
spark git commit: [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite
Repository: spark Updated Branches: refs/heads/master 9b9fe5f7b -> dba95ea03 [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite Fix the following issues in StandaloneDynamicAllocationSuite: 1. It should not assume master and workers start in order 2. It should not assume master and workers get ready at once 3. It should not assume the application is already registered with master after creating SparkContext 4. It should not access Master.app and idToApp which are not thread safe The changes includes: * Use `eventually` to wait until master and workers are ready to fix 1 and 2 * Use `eventually` to wait until the application is registered with master to fix 3 * Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4 Author: zsxwingCloses #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba95ea0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba95ea0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba95ea0 Branch: refs/heads/master Commit: dba95ea03216e6b8e623db4a36e1018c6ed95538 Parents: 9b9fe5f Author: zsxwing Authored: Tue Sep 29 11:53:28 2015 -0700 Committer: Andrew Or Committed: Tue Sep 29 11:53:28 2015 -0700 -- .../StandaloneDynamicAllocationSuite.scala | 305 --- 1 file changed, 192 insertions(+), 113 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dba95ea0/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0..2e2fa22 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,15 @@ package org.apache.spark.deploy +import scala.concurrent.duration._ + import org.mockito.Mockito.{mock, when} import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite } master = makeMaster() workers = makeWorkers(10, 2048) +// Wait until all workers register with master successfully +eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) +} } override def afterAll(): Unit = { @@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation default behavior") { sc = new SparkContext(appConf) val appId = sc.applicationId -assert(master.apps.size === 1) -assert(master.apps.head.id === appId) -assert(master.apps.head.executors.size === 2) -assert(master.apps.head.getExecutorLimit === Int.MaxValue) +eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) +} // kill all executors assert(killAllExecutors(sc)) -assert(master.apps.head.executors.size === 0) -assert(master.apps.head.getExecutorLimit === 0) +var apps = getApplications() +assert(apps.head.executors.size === 0) +assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 1) -assert(master.apps.head.getExecutorLimit === 1) +apps = getApplications() +assert(apps.head.executors.size === 1) +assert(apps.head.getExecutorLimit === 1) // request 1 more assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 2) -assert(master.apps.head.getExecutorLimit === 2) +apps = getApplications() +assert(apps.head.executors.size === 2) +assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) -assert(master.apps.head.executors.size === 2) -assert(master.apps.head.getExecutorLimit === 3) +apps = getApplications() +assert(apps.head.executors.size === 2) +
spark git commit: [SPARK-10871] include number of executor failures in error msg
Repository: spark Updated Branches: refs/heads/master dba95ea03 -> b7ad54ec7 [SPARK-10871] include number of executor failures in error msg Author: Ryan WilliamsCloses #8939 from ryan-williams/errmsg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7ad54ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7ad54ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7ad54ec Branch: refs/heads/master Commit: b7ad54ec793af1c84973b402f5cceb88307f7996 Parents: dba95ea Author: Ryan Williams Authored: Tue Sep 29 13:19:46 2015 -0700 Committer: Andrew Or Committed: Tue Sep 29 13:19:46 2015 -0700 -- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7ad54ec/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 93621b4..07a0a45 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -345,7 +345,7 @@ private[spark] class ApplicationMaster( if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, -"Max number of executor failures reached") +s"Max number of executor failures ($maxNumExecutorFailures) reached") } else { logDebug("Sending progress") allocator.allocateResources() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10871] include number of executor failures in error msg
Repository: spark Updated Branches: refs/heads/branch-1.5 d54493279 -> 3b2387368 [SPARK-10871] include number of executor failures in error msg Author: Ryan WilliamsCloses #8939 from ryan-williams/errmsg. (cherry picked from commit b7ad54ec793af1c84973b402f5cceb88307f7996) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b238736 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b238736 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b238736 Branch: refs/heads/branch-1.5 Commit: 3b23873684fba02803af49e9b56063b38fb61eca Parents: d544932 Author: Ryan Williams Authored: Tue Sep 29 13:19:46 2015 -0700 Committer: Andrew Or Committed: Tue Sep 29 13:19:52 2015 -0700 -- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3b238736/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 991b5ce..8e909e9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -345,7 +345,7 @@ private[spark] class ApplicationMaster( if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, -"Max number of executor failures reached") +s"Max number of executor failures ($maxNumExecutorFailures) reached") } else { logDebug("Sending progress") allocator.allocateResources() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org